Optimized ROS subscription pipeline using a custom CallbackQueueInterface implementation
Optimized ROS subscription pipeline, avoiding unnecessary copies by directly pushing the data to Orocos and bypassing unnecessary ROS threads.
Goal
Do not depend on ROS spinner threads for subscriptions. Minimize latency of port reads and decouple multiple components with ROS subscriptions connected to ports. At the moment they all share the same pool of ROS spinner threads.
Approach
The patch introduces a new "queue" without a queue to bypass the global ros::CallbackQueue of ROS for the subscripers of Orocos messages. Instead, the network thread will immediately execute the callback, and will pass it to the global ros::CallbackQueue only in case that it fails to execute with ros::CallbackInterface::TryAgain status.
Implementation
A new implementation of ros::CallbackQueueInterface is provided that doesn't hold a queue but directly calls the call() method of the ros::CallbackInterface received.
Helper functions are provided to generate a subscriber with a custom implementation of CallbackQueueInterface.
This method is chosen for the subscribers of the message transporter.
Check specially the section 4. Advanced: Using Different Callback Queues.
Our use-case is not in 4.1. Shortcutting the callback queue by not having a queue.
All the overloaded Subscriber::subscribe() creates a SubscribeOptions ops and calls subscribe(ops). The way to use a custom callback queue is to replace then entry in SubscribeOptions options as in ops.callback_queue differentfrom0(0is defaultcallback_queue`, the ROS global queue callback).
The function CallbackQueue::callOneCB() executes a callback and then it removes it. The execution of the callback,
inherited from CallbackInterface is a simple cb->call(). In this case, the CallbackInterface is itself a SubscriptionQueue which can be executed through that call().
The function also deals with ros::CallbackInterface::TryAgain result, which would be the case if in a multi-threaded situation the callback is being taken by another thread. We don't expect to get into this situation ever, but in case it happens, we can forward the callback to the global callback queue and let the ROS spinner deal with it, as in the original behavior.
Optimized ROS subscription pipeline using a custom
CallbackQueueInterface
implementationOptimized ROS subscription pipeline, avoiding unnecessary copies by directly pushing the data to Orocos and bypassing unnecessary ROS threads.
Goal
Do not depend on ROS spinner threads for subscriptions. Minimize latency of port reads and decouple multiple components with ROS subscriptions connected to ports. At the moment they all share the same pool of ROS spinner threads.
Approach
The patch introduces a new "queue" without a queue to bypass the global
ros::CallbackQueue
of ROS for the subscripers of Orocos messages. Instead, the network thread will immediately execute the callback, and will pass it to the globalros::CallbackQueue
only in case that it fails to execute withros::CallbackInterface::TryAgain
status.Implementation
ros::CallbackQueueInterface
is provided that doesn't hold a queue but directly calls thecall()
method of theros::CallbackInterface
received.CallbackQueueInterface
.References
Callbacks and spinning
Explanation about Callbacks and Spinning: http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning
Check specially the section 4. Advanced: Using Different Callback Queues. Our use-case is not in 4.1. Shortcutting the callback queue by not having a queue.
Function
Subscriber::subscribe()
Reference
All the overloaded
Subscriber::subscribe()
creates aSubscribeOptions ops
and callssubscribe(ops)
. The way to use a custom callback queue is to replace then entry inSubscribeOptions
options as inops.callback_queue
differentfrom
0(
0is default
callback_queue`, the ROS global queue callback).Function
CallbackQueue::callOneCB()
Reference
The function
CallbackQueue::callOneCB()
executes a callback and then it removes it. The execution of the callback, inherited fromCallbackInterface
is a simplecb->call()
. In this case, theCallbackInterface
is itself aSubscriptionQueue
which can be executed through thatcall()
. The function also deals withros::CallbackInterface::TryAgain
result, which would be the case if in a multi-threaded situation the callback is being taken by another thread. We don't expect to get into this situation ever, but in case it happens, we can forward the callback to the global callback queue and let the ROS spinner deal with it, as in the original behavior.Function
Subscription::handleMessage()
Reference
The function
Subscription::handleMessage()
is the function that calls theaddCallback()
to push the callback to the global queue for a subscription.Function
SubscriptionQueue::call()
Reference
The
SubscriptionQueue
is itself aCallbackInterface
so it can becall()
ed which executes normally all the needed delivery of the message.