ros / ros_comm

ROS communications-related packages, including core client libraries (roscpp, rospy, roslisp) and graph introspection tools (rostopic, rosnode, rosservice, rosparam).
http://wiki.ros.org/ros_comm
761 stars 912 forks source link

message_filters.Cache sometimes misses messages from latched publishers #2189

Open heuristicus opened 3 years ago

heuristicus commented 3 years ago

Sometimes when using multiple caches in the same node on the same topic, which is latched, the expected behaviour of the cache does not happen.

What I expect to happen is that when the cache is initialised, it is immediately populated with the message on the latched topic. This should happen regardless of how many caches I create.

However, this is not the case. Due to the way that the cache is initialised, it is possible for the latched topic message to be missed entirely.

Here's a minimal working example:

#!/usr/bin/env python

import rospy
from geometry_msgs.msg import PoseStamped
import message_filters

def first_cb(msg):
    rospy.loginfo("first callback called")

def second_cb(msg):
    rospy.loginfo("second callback called")

def third_cb(msg):
    rospy.loginfo("third callback called")

def fourth_cb(msg):
    rospy.loginfo("fourth callback called")

if __name__ == "__main__":
    rospy.init_node("message_filters_cache_test")
    test_topic = "/cache_test"
    # Create a latched publisher
    pub = rospy.Publisher(test_topic, PoseStamped, latch=True, queue_size=1)
    pub.publish(PoseStamped())
    rospy.sleep(1)

    rospy.loginfo("Starting first cache")
    cache_one = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_one.registerCallback(first_cb)

    rospy.loginfo("Starting second cache")
    cache_two = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_two.registerCallback(second_cb)

    rospy.loginfo("Starting third cache")
    cache_three = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_three.registerCallback(third_cb)

    rospy.loginfo("Starting fourth cache")
    cache_four = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_four.registerCallback(fourth_cb)

    rospy.spin()

I suggest running this script several times. The output that I would expect to see from this script is something like

[INFO] [1633092471.990110] [/message_filters_cache_test]: Starting first cache
[INFO] [1633092472.005591] [/message_filters_cache_test]: Starting second cache
[INFO] [1633092472.011472] [/message_filters_cache_test]: Starting third cache
[INFO] [1633092472.019149] [/message_filters_cache_test]: Starting fourth cache
[INFO] [1633092472.019846] [/message_filters_cache_test]: first callback called
[INFO] [1633092472.027311] [/message_filters_cache_test]: second callback called
(messages below this are written by me)
[INFO] [1633092472.038421] [/message_filters_cache_test]: third callback called
[INFO] [1633092472.027311] [/message_filters_cache_test]: fourth callback called

However, in actuality it is often the case that the third and fourth callbacks are not called. I have never seen the fourth callback be called.

Here's the order of relevant things that happen when initialising a cache (see https://github.com/ros/ros_comm/blob/noetic-devel/utilities/message_filters/src/message_filters/__init__.py)

  1. The message_filters.Subscriber is initialised, which creates a normal subscriber inside it, and registers the message_filters.Subscriber.callback method as a callback.
  2. This subscriber is passed into the message_filters.Cache initialisation, which calls its connectInput function, which uses the SimpleFilter.registerCallback method to add the Cache's add method to the list of callbacks
  3. The user adds their own callback to the Cache by calling the registerCallback function.

The problem is, sometimes the message_filters.Subscriber that underpins all of this receives a message before either of the registerCallback functions are called, which means that either the user's callback is not called, but the cache's callback is, or neither of the callbacks are called.

I'm not sure of the best way to ensure this does not happen. I think the simplest way would be to add a start function to the message_filters.Subscriber, which can be called by the Cache to actually initialise the ros subscriber, which would be done at the end of the initialisation of the cache. Additionally, it may be worth having a start function in the cache, so that the user can register their callbacks and then start the cache/subscriber once they have added them all. This would guarantee that if the subscriber instantly receives a message, all relevant callbacks are actually called. This could be controlled by an autostart argument which would disable automatically starting the subscriber.

heuristicus commented 3 years ago

With the changes in #2190, I modified the test code:

#!/usr/bin/env python

import rospy
from geometry_msgs.msg import PoseStamped
import message_filters

def first_cb(msg):
    rospy.loginfo("first callback called")

def second_cb(msg):
    rospy.loginfo("second callback called")

def third_cb(msg):
    rospy.loginfo("third callback called")

def fourth_cb(msg):
    rospy.loginfo("fourth callback called")

def default_subscriber_init():
    rospy.loginfo("Starting first cache")
    cache_one = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_one.registerCallback(first_cb)

    rospy.loginfo("Starting second cache")
    cache_two = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_two.registerCallback(second_cb)

    rospy.loginfo("Starting third cache")
    cache_three = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_three.registerCallback(third_cb)

    rospy.loginfo("Starting fourth cache")
    cache_four = message_filters.Cache(
        message_filters.Subscriber(test_topic, PoseStamped)
    )
    cache_four.registerCallback(fourth_cb)

def delayed_subscriber_init():
    rospy.loginfo("Starting first cache")
    sub_one = message_filters.Subscriber(test_topic, PoseStamped, auto_start=False)
    cache_one = message_filters.Cache(sub_one)
    cache_one.registerCallback(first_cb)
    sub_one.start()

    rospy.loginfo("Starting second cache")
    sub_two = message_filters.Subscriber(test_topic, PoseStamped, auto_start=False)
    cache_two = message_filters.Cache(sub_two)
    cache_two.registerCallback(second_cb)
    sub_two.start()

    rospy.loginfo("Starting third cache")
    sub_three = message_filters.Subscriber(test_topic, PoseStamped, auto_start=False)
    cache_three = message_filters.Cache(sub_three)
    cache_three.registerCallback(third_cb)
    sub_three.start()

    rospy.loginfo("Starting fourth cache")
    sub_four = message_filters.Subscriber(test_topic, PoseStamped, auto_start=False)
    cache_four = message_filters.Cache(sub_four)
    cache_four.registerCallback(fourth_cb)
    sub_four.start()

if __name__ == "__main__":
    rospy.init_node("message_filters_cache_test")
    test_topic = "/cache_test"
    # Create a latched publisher
    pub = rospy.Publisher(test_topic, PoseStamped, latch=True, queue_size=1)
    pub.publish(PoseStamped())
    rospy.sleep(1)

    rospy.loginfo("Creating caches with default initialisation")
    default_subscriber_init()
    rospy.sleep(1)
    rospy.loginfo("-----------------------------------------------")
    rospy.loginfo("Creating caches with delayed sub initialisation")
    delayed_subscriber_init()

    rospy.spin()

The output in cases where the subscriber receives a message too quickly looks like

[INFO] [1633352455.883093] [/message_filters_cache_test]: Creating caches with default initialisation
[INFO] [1633352455.886750] [/message_filters_cache_test]: Starting first cache
[INFO] [1633352455.893901] [/message_filters_cache_test]: Starting second cache
[INFO] [1633352455.897127] [/message_filters_cache_test]: Starting third cache
[INFO] [1633352455.900537] [/message_filters_cache_test]: Starting fourth cache
[INFO] [1633352455.909031] [/message_filters_cache_test]: first callback called
[INFO] [1633352455.912484] [/message_filters_cache_test]: second callback called
[INFO] [1633352455.915431] [/message_filters_cache_test]: third callback called
[INFO] [1633352456.911450] [/message_filters_cache_test]: -----------------------------------------------
[INFO] [1633352456.914936] [/message_filters_cache_test]: Creating caches with delayed sub initialisation
[INFO] [1633352456.918040] [/message_filters_cache_test]: Starting first cache
[INFO] [1633352456.921355] [/message_filters_cache_test]: first callback called
[INFO] [1633352456.924562] [/message_filters_cache_test]: Starting second cache
[INFO] [1633352456.927686] [/message_filters_cache_test]: second callback called
[INFO] [1633352456.930950] [/message_filters_cache_test]: Starting third cache
[INFO] [1633352456.934159] [/message_filters_cache_test]: third callback called
[INFO] [1633352456.937385] [/message_filters_cache_test]: Starting fourth cache
[INFO] [1633352456.940466] [/message_filters_cache_test]: fourth callback called