Closed lucasw closed 1 week ago
rosbag record calls doCheckMaster once a second https://github.com/ros-o/ros_comm/blob/obese-devel/tools/rosbag/src/recorder.cpp#L661-L666 and uses ros::master::getTopics(topics)
to get a current list of topics.
https://github.com/ros-o/ros_comm/blob/obese-devel/clients/roscpp/src/libros/master.cpp#L122-L140
bool getTopics(V_TopicInfo& topics)
{
XmlRpc::XmlRpcValue args, result, payload;
args[0] = this_node::getName();
args[1] = ""; //TODO: Fix this
if (!execute("getPublishedTopics", args, result, payload, true))
{
return false;
}
topics.clear();
for (int i = 0; i < payload.size(); i++)
{
topics.push_back(TopicInfo(std::string(payload[i][0]), std::string(payload[i][1])));
}
return true;
}
https://github.com/ros-o/ros_comm/blob/obese-devel/clients/roscpp/src/libros/master.cpp#L175-L245
Fortunately that exists already in roslibrust master client get_published_topics
:
https://github.com/Carter12s/roslibrust/blob/master/roslibrust/src/ros1/master_client.rs#L306-L322
MasterClient
is a member in Node
and those are in NodeHandle, so see if node_handle.node.master_client.get_published_topics()
works - no inner (the node in node handle) is private
Had to expose several things in roslibrust https://github.com/lucasw/roslibrust/tree/pub_master_client in order to be able to create a MasterClient on the outside https://github.com/lucasw/mcap_tools/blob/main/mcap_tools/src/bin/rostopic_list.rs
Now how to subscribe without deserialization, just get a vector of bytes? If I could instantiate a normal subscriber then adding a next_raw() function would be easy, but can I turn string into a message type to create the subscriber in the first place?
(also once it is time to record into the mcap, I need to have the message definition- I think I can get that from roslibrust)
If there's a hashmap of all message type generated then could extract the definition, or supply the type to the subscriber
geometry_msgs::PointStamped::DEFINITION
impl ::roslibrust_codegen::RosMessageType for #struct_name {
const ROS_TYPE_NAME: &'static str = #ros_type_name;
const MD5SUM: &'static str = #md5sum;
const DEFINITION: &'static str = #raw_message_definition;
}
I think the building blocks are there in subscriber.rs - the topic type ends up getting turn back into a string, and I think *
for md5sum will match everything, leave msg_definition blank.
let connection_header = ConnectionHeader {
caller_id: node_name.to_string(),
latching: false,
msg_definition,
md5sum: "*",
topic: Some(topic_name.to_owned()),
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
};
But in addition to a custom Subscription, there needs to be code in node/actor.rs to support it as well- add a register_subscriber_raw() or similar, and subscribe_raw() in node/handle.rs
But can't just make a SubscriberAny type, it has to be Subscriber
Example of recording to mcap from ros in C++ (which has topic_tools::ShapeShifter for the generic message subscription) https://github.com/lucasw/ros_one2z/blob/main/one2z/src/topic_to_mcap.cpp
Try detecting if subscriber type is std_msgs ByteMultiArray and if so fill response with raw message bytes, and configure subscriber in first place with connection header information
rospy.AnyMsg
: https://github.com/ros-o/ros_comm/blob/d5a1b6dae32e6db636d628a11c7633abe3f01974/clients/rospy/src/rospy/msg.py#L48
rospy.names.TOPIC_ANYTYPE
'*'
shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
ROS_INFO("Subscribing to %s", topic.c_str());
ros::NodeHandle nh;
shared_ptr<int> count(boost::make_shared<int>(options_.limit));
shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
ros::SubscribeOptions ops;
ops.topic = topic;
ops.queue_size = 100;
ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
const ros::MessageEvent<topic_tools::ShapeShifter const> &> >(
boost::bind(&Recorder::doQueue, this, boost::placeholders::_1, topic, sub, count));
ops.transport_hints = options_.transport_hints;
*sub = nh.subscribe(ops);
currently_recording_.insert(topic);
num_subscribers_++;
return sub;
}
template<>
struct MD5Sum<topic_tools::ShapeShifter>
{
static const char* value(const topic_tools::ShapeShifter& m) { return m.getMD5Sum().c_str(); }
// Used statically, a shapeshifter appears to be of any type
static const char* value() { return "*"; }
};
template<>
struct DataType<topic_tools::ShapeShifter>
{
static const char* value(const topic_tools::ShapeShifter& m) { return m.getDataType().c_str(); }
// Used statically, a shapeshifter appears to be of any type
static const char* value() { return "*"; }
};
template<>
struct Definition<topic_tools::ShapeShifter>
{
static const char* value(const topic_tools::ShapeShifter& m) { return m.getMessageDefinition().c_str(); }
};
} // namespace message_traits
Receiving raw bytes is now working in https://github.com/lucasw/roslibrust/tree/subscribe_any and https://github.com/lucasw/mcap_tools/blob/main/mcap_tools/src/mcap_record.rs
But need to rebase roslibrust on latest changes to upstream roslibrust.
Next will need to get information out of the connection_header when writing messages to disk- can that be returned with each next_raw() call?
Seeing an issue where next_raw() triggers several times on same message, specifically a marti_common_msgs Float64Stamped
See this somewhat frequently under moderate load (e.g. 35 MB/s, 26% cpu), maybe the queue size needs to be larger:
2024-08-09T14:12:52.074Z ERROR [mcap_record] Err(Lagged(61))
2024-08-09T14:13:13.723Z ERROR [mcap_record] Err(Lagged(4))
2024-08-09T14:13:21.676Z ERROR [mcap_record] Err(Lagged(4))
2024-08-09T14:13:24.279Z ERROR [mcap_record] Err(Lagged(5))
2024-08-09T14:13:26.322Z ERROR [mcap_record] Err(Lagged(177))
The lagged if from next() Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))),
rosbag record -a
takes only 15% cpu on same dataset, though does have a rosbag record buffer exceeded. Dropping oldest queued message.
occasionally with default buffer size.
Increasing the queue size in mcap_record does help. Need to run tokio console to learn more.
It could purely be a difference in compression costs?
rosbag record -a --bz2 takes 48% cpu rosbag record -a --lz4 taks 25-35% cpu, so mcap with default compression (zstd) + roslibrust wins, though lz4 takes less disk space
Went down probably a wrong path in subscribe_any_next_fn
, try again with a function created in subscribe_any or regular subscribe() that gets used in next()
test roslibrust
ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find geometry_msgs`:`rospack find sensor_msgs`:`rospack find std_srvs` cargo test --features ros1_test test_subscribe_any
but have to comment out example_package and example_package_macro?
Have to run roscore in background for the ros native integration tests.
Also this was working earlier, but now it never completes, times out after 60 seconds.
This looks to exclude example_package
ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find std_srvs` cargo test --release -p roslibrust --features ros1_test _any -- --show-output
and it will run both publish and subscribe any tests
get debug logs
RUST_LOG=debug ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find std_srvs` cargo test --release -p roslibrust --features ros1_test subscribe_any -- --nocapture
See missing messages when running mcap record currently, try making a standalone subscriber node or a roslibrust integration test with >1000 hz publishing.
rosbag play --delay 1.0
helps some, also polling the rosmaster more often for new topics, increasing queue sizes, but still not getting all of them.
Additional experimentation with rosbag record -a shows it is also dropping about the same amount of messages, so not going to worry about this for now.
-> investigate later with mcap_intersection
Need to disable statistics when doing a rosbag play -> rosbag record / mcap_record test, or conversely enable statistics but remap any statistics topic when playing the source bag.
I thought plotjuggler fully supported ros1 mcap, and it loads and plots them fine, but one feature that doesn't work is the ROS Topic Re-Publisher
, I wonder if it gives up on any topic where it doesn't find an md5sum.
(rosbag stores the md5sum in the message)
The foxglove go code is using the md5sum in the conversion process only to handle the case where two different md5sums appear on the same topic (from different publishers presumably), but it doesn't actually write them to the mcap schema or channel.
rostopic echo needs the md5sum from the publisher
When I use ros1_play_mcap.py
I'm using roslib get_message_class() to look up the message and that gets the md5sum.
Does the anymsg published actually make it onto the wire, or never gets published? It does go onto the wire, because rostopic hz works with it.
Now have playback working with md5sum generation https://github.com/lucasw/roslibrust/commit/04bb108823f42e97352c0c71dc441cbaa928c0ad (subscribe_any branch)
Duplicate rosbag record but in rust and for mcap.
Need to poll or get updates when new topics appear.
How to even get a rostopic list? Does roslibrust expose anything through node handle?
The rosmaster api https://wiki.ros.org/ROS/Master_API has interaction for getting lists of topics?