eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
843 stars 350 forks source link

Question regarding wake up packets for terminating dds internal threads #1818

Open cfveeden opened 1 year ago

cfveeden commented 1 year ago

I've been looking into this issue this and I have a question. Before I get to my question, I need to explain the test setup first.

To avoid having to install anything extra or requiring a specific network setup, I set up the following test.

I create a dummy interface with the following:

ip link add dummy0 type dummy
ip addr add 192.168.0.223/24 dev dummy0
ip link set dummy0 multicast on

Then I do a export CYCLONEDDS_URI=file://... to point to cyclone to my xml setup that uses that interface:

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/master/etc/cyclonedds.xsd">
  <Domain id="0">
    <General>
      <Interfaces>
        <NetworkInterface name="dummy0"/>
      </Interfaces>
    </General>
  </Domain>
</CycloneDDS>

I start a publisher and send SIGINT to the terminal, and it hangs:

myuser@MYHOSTNAME:~$ ros2 topic pub /hello std_msgs/String data:\ \'hi\'\ 
publisher: beginning loop
publishing #1: std_msgs.msg.String(data='hi')

publishing #2: std_msgs.msg.String(data='hi')

publishing #3: std_msgs.msg.String(data='hi')

^C

Granted, the network setup is probably wrong due to the dummy interface I created, but that doesn't really matter for the question I'm working towards. I've also tested it on real interfaces and similar things happen. I've seen this comment ascribing this sort of behaviour to the use of multiple threads for receive queues, so I check the open ports using lsof and grepping for udp. With some edits, it looks like this:

# lsof | grep `pgrep ros2`
COMMAND      PID    TID TASKCMD                  USER   FD      TYPE             DEVICE  SIZE/OFF       NODE NAME
...
ros2      104585                             myuser    4u     IPv4            7603001       0t0        UDP *:54676
ros2      104585                             myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585                             myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585                             myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104586 ros2                 myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104586 ros2                 myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104586 ros2                 myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104586 ros2                 myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104587 gc                   myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104587 gc                   myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104587 gc                   myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104587 gc                   myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104588 dq.builti            myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104588 dq.builti            myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104588 dq.builti            myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104588 dq.builti            myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104589 dq.user              myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104589 dq.user              myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104589 dq.user              myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104589 dq.user              myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104590 tev                  myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104590 tev                  myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104590 tev                  myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104590 tev                  myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104591 recv                 myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104591 recv                 myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104591 recv                 myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104591 recv                 myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104592 recvMC               myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104592 recvMC               myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104592 recvMC               myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104592 recvMC               myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104593 recvUC               myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104593 recvUC               myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104593 recvUC               myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104593 recvUC               myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469
ros2      104585 104594 ros2                 myuser    4u     IPv4            7603001       0t0        UDP *:54676 
ros2      104585 104594 ros2                 myuser    5u     IPv4            7603002       0t0        UDP *:7400 
ros2      104585 104594 ros2                 myuser    6u     IPv4            7603003       0t0        UDP *:7401 
ros2      104585 104594 ros2                 myuser    7u     IPv4            7603004       0t0        UDP MYHOSTNAME:59469

The hanging thread terminates when I write something arbitrary to port 7401 on localhost:

myuser@MYHOSTNAME:~$ netcat -u localhost 7401
a
^C

If I repeat the test, but take the interface down while it is running with ip link set dummy0 down, I see the following:

myuser@MYHOSTNAME:~$ ros2 topic pub /hello std_msgs/String data:\ \'hi\'\                                                                                                                 
publisher: beginning loop                                                                                                                                                                                          
publishing #1: std_msgs.msg.String(data='hi')                                                                                                                                                                      

publishing #2: std_msgs.msg.String(data='hi')                                                                                                                                                                      

publishing #3: std_msgs.msg.String(data='hi')

publishing #4: std_msgs.msg.String(data='hi')

publishing #5: std_msgs.msg.String(data='hi')

publishing #6: std_msgs.msg.String(data='hi')

publishing #7: std_msgs.msg.String(data='hi')

publishing #8: std_msgs.msg.String(data='hi')

1693407153.763565 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
publishing #9: std_msgs.msg.String(data='hi')

publishing #10: std_msgs.msg.String(data='hi')

publishing #11: std_msgs.msg.String(data='hi')

publishing #12: std_msgs.msg.String(data='hi')

publishing #13: std_msgs.msg.String(data='hi')

publishing #14: std_msgs.msg.String(data='hi')

publishing #15: std_msgs.msg.String(data='hi')

^C1693407160.218745 [0]       ros2: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407160.218805 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
1693407161.218961 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
z1693407162.219112 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407163.219270 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407164.219409 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407165.219554 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407166.219698 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407167.219778 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1
1693407168.219967 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7401 failed with retcode -1

Which is also recoverable with the same netcat command.

Finally, my question: would it not make more sense to send the "wake up" termination packet for the thread via localhost, instead of the interface that cyclone is bound to?

eboasson commented 12 months ago

Finally, my question: would it not make more sense to send the "wake up" termination packet for the thread via localhost, instead of the interface that cyclone is bound to?

That is a good question indeed. There are probably two things worth considering:

Something that works reliably with IPv4 would already be a great improvement. I think your idea might should work. Given the above, I think the easiest way would be to create a new socket with the outgoing multicast interface set to 127.0.0.1, and only do this if it using UDPv4. (Come to think of it, technically, there's no guarantee of a loopback interface nor that it uses 127.0.0.1 ... nor that 127.x.y.z is a loopback address. Oh well, we can always add an option to disable it for people that like to have a crazy setup.)

Finally, if you'd like any clarification on what I meant just ask!

cfveeden commented 11 months ago

Thanks for the feedback and sorry for my delayed response. I have a workaround for the issues caused by this particular bug, so any additional work into this matter is optional for me too.

Everything you said in your previous post makes sense, though I'll admit that I did not dive deep into the work in progress that you linked.

Since using the loopback interface to terminate the threads is subject to so many caveats, it does not appear to be the best approach to take. I would like to hear your thoughts on other possible approaches.

My original question is based on the idea that communication between threads should be local. Ideally this communication should be local to the parent process of the threads and should not involve the network at all.

Some ideas for terminating a blocking thread are presented by Michael Fuhr here. Using a signal to wake the blocking thread sounds promising, but there seem to be issues with that approach based on this answer on Stack Overflow.

The best option from what I could find seems to be to open a pipe to the thread and use something like select() to monitor IO over the websocket and the pipe as described in this Stack Overflow post. This approach is not without caveats, but it seems much cleaner.