fair-acc / gnuradio4

Prototype implementations for a more compile-time efficient flowgraph API
GNU Lesser General Public License v3.0
22 stars 8 forks source link

Unexpected tag-handling with multi-threaded scheduler #347

Closed daniestevez closed 3 weeks ago

daniestevez commented 1 month ago

I have a simple example that works as expected with a gr::scheduler::Simple<gr::scheduler::ExecutionPolicy::singleThreaded> scheduler, but fails with a gr::scheduler::Simple<gr::scheduler::ExecutionPolicy::multiThreaded> scheduler due to unexpected tag propagation.

The flowgraph is Vector Source -> CRC Append -> Null Sink. The Vector source generates repeatedly 1500 bytes with a {"packet_len", 1500} tag in the first item of each repetition. The CRC Append block expects a stream of packets delimited with tags in this way at the input, and produces a stream of packets also delimited by tags by appending a CRC-32 to each packet (so the output packets are 1504 bytes long.

When run with the single-threaded scheduler, the flowgraph runs forever and doesn't fail (the example should be built with cmake -D CMAKE_CXX_FLAGS=-DTRACE to enable prints in the processBulk() function of each block). When run with the multi-threaded scheduler, CRC Append gives an error because it doesn't find the "packet_len" tag at the expected location. The output looks like this:

gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::VectorSource<unsigned char>::processBulk(outSpan.size() = 65536)
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = true
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::VectorSource<unsigned char>::processBulk returning OK
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 1500, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::VectorSource<unsigned char>::processBulk(outSpan.size() = 1500)
gr::packet_modem::VectorSource<unsigned char>::processBulk returning OK
libc++abi: terminating due to uncaught exception of type gr::exception: scheduler gr::scheduler::Simple<(gr::scheduler::ExecutionPolicy)1, gr::profiling::null::Profiler>: throwing ignored exception 2024-05-23T07:48:11.641: /gr4-packet-modem/blocks/include/gnuradio-4.0/packet-modem/crc_append.hpp:112:17: auto gr::packet_modem::CrcAppend<>::processBulk(const gr::CircularBuffer<unsigned char>::ConsumableInputRange<unsigned char, gr::SpanReleasePolicy::ProcessAll> &, gr::CircularBuffer<unsigned char>::PublishableOutputRange<> &)::(anonymous class)::operator()() const [CrcType = unsigned long]: expected packet-length tag not found in method: auto gr::packet_modem::CrcAppend<>::processBulk(const gr::CircularBuffer<unsigned char>::ConsumableInputRange<unsigned char, gr::SpanReleasePolicy::ProcessAll> &, gr::CircularBuffer<unsigned char>::PublishableOutputRange<> &)::(anonymous class)::operator()() const [CrcType = unsigned long] at /gr4-packet-modem/gnuradio4/core/include/gnuradio-4.0/Scheduler.hpp:133
Aborted (core dumped)

The problem is that the "packet-len" tag is present in a processBulk() call which has inSpan.size() = 0, but it is not present afterwards:

[...]
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = true
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 0, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false
[...]
gr::packet_modem::CrcAppend<unsigned long>::processBulk(inSpan.size() = 1500, outSpan.size = 65536), d_data_remaining = 0, d_crc_remaining = 0, input_tags_present() = false

This is unexpected for me, since I wouldn't expect tags to be present when inSpan.size() is zero. I would expect the tag and its "associated item" to travel together, and only have the tag present if the inSpan.size() > 0 and the associated item is the first one in inSpan.

wirew0rm commented 1 month ago

This is basically the same issue as the one that is blocking the DataSet sink. Tags are unconditionally consumed not considering if they were applied at all. Unfortunately fixing it had some knock on effects to be taken care of.

daniestevez commented 1 month ago

I've also come up with a case where using the single-threaded scheduler, a tag is output in the first item published by a block, but the tag is not present in the processBulk() call of the following block in which these items are present in inSpan.

I can try to make a small example if this is not part of the same known problem.

daniestevez commented 3 weeks ago

I confirm that this particular example is now working with the latest gnuradio4 main.