ros2 / rclcpp

rclcpp (ROS Client Library for C++)
Apache License 2.0
563 stars 427 forks source link

Starvation in MultiThreadedExecutor #2645

Open fmrico opened 1 month ago

fmrico commented 1 month ago

Bug report

Required Info:

Steps to reproduce issue

Use this program. The commented lines are used by me to trace the problem, using https://github.com/fmrico/yaets

#include <fstream>

// #include "yaets/tracing.hpp"

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;
using std::placeholders::_1;

// yaets::TraceSession session("session1.log");

class ProducerNode : public rclcpp::Node
{
public:
  ProducerNode() : Node("producer_node")
  {
    pub_1_ = create_publisher<std_msgs::msg::Int32>("topic_1", 100);
    pub_2_ = create_publisher<std_msgs::msg::Int32>("topic_2", 100);
    timer_ = create_wall_timer(1ms, std::bind(&ProducerNode::timer_callback, this));
  }

  void timer_callback()
  {
    // TRACE_EVENT(session);
    message_.data += 1;
    pub_1_->publish(message_);
    message_.data += 1;
    pub_2_->publish(message_);
  }

private:
  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_1_, pub_2_;
  rclcpp::TimerBase::SharedPtr timer_;
  std_msgs::msg::Int32 message_;
};

class ConsumerNode : public rclcpp::Node
{
public:
  ConsumerNode() : Node("consumer_node")
  {
    sub_2_ = create_subscription<std_msgs::msg::Int32>(
      "topic_2", 100, std::bind(&ConsumerNode::cb_2, this, _1));
    sub_1_ = create_subscription<std_msgs::msg::Int32>(
      "topic_1", 100, std::bind(&ConsumerNode::cb_1, this, _1));

    timer_ = create_wall_timer(10ms, std::bind(&ConsumerNode::timer_callback, this));
  }

  void cb_1(const std_msgs::msg::Int32::SharedPtr msg)
  {
    // TRACE_EVENT(session);

    waste_time(500us);
  }

  void cb_2(const std_msgs::msg::Int32::SharedPtr msg)
  {
    // TRACE_EVENT(session);

    waste_time(500us);
  }

  void timer_callback()
  {
    // TRACE_EVENT(session);

    waste_time(5ms);
  }

  void waste_time(const rclcpp::Duration & duration)
  {
    auto start = now();
    while (now() - start < duration);
  }

private:
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_1_;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_2_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);

  auto node_pub = std::make_shared<ProducerNode>();
  auto node_sub = std::make_shared<ConsumerNode>();

  // rclcpp::executors::SingleThreadedExecutor executor;
  rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), 8);

  executor.add_node(node_pub);
  executor.add_node(node_sub);

  executor.spin();

  rclcpp::shutdown();
  return 0;
}

Expected behavior

The behavior expected is more or less what happens with SingleThreadedExecution:

Single_Threaded_jazzy

Actual behavior

Callback for /topic2 (cb2) suffers starvation because most of the time timer and cb1 has event and messages to process

Multi_Thread_jazzy

mjcarroll commented 1 month ago

Can you re-profile with the callback groups set to Reentrant? While I don't think that the timing looks correct here, using purely mutually exclusive callback groups with multithreaded executor is kind of an anti-pattern. It causes the waitset to get rebuilt every wait, which is a pretty expensive operation, relatively speaking.

fmrico commented 1 month ago

Hi @mjcarroll

What you asked for:

MultiThreadedExecutor with Callback Group Reentrant

As you might expect, there is no starvation, but the user must manage any race conditions.

Multi_Threaded_Reentrant

Bonus!! EventsExecutor

This is correct, as it is not multithreaded, but it confirms that there is no starvation.

EventsExecutor_Reentrant

Discussion on Starvation on a reasonable setup

I understand that it may be an antipattern, but I think it is not unreasonable for a user to want to use a MultiThreadedExecutor with MutuallyExclusive when he has several nodes, to parallelize both. In this case, there is also a starvation of one of the two callbacks of each node (cb2 is never executed). I will first put the graph and then the code used, which is basically the same as before, duplicating the node. I know there are other ways to do it, but this was the fastest :)

Multi_Threaded_TwoConsumers

#include <fstream>

#include "yaets/tracing.hpp"

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;
using std::placeholders::_1;

yaets::TraceSession session("session1.log");

class ProducerNode : public rclcpp::Node
{
public:
  ProducerNode() : Node("producer_node")
  {
    pub_1_ = create_publisher<std_msgs::msg::Int32>("topic_1", 100);
    pub_2_ = create_publisher<std_msgs::msg::Int32>("topic_2", 100);
    timer_ = create_wall_timer(1ms, std::bind(&ProducerNode::timer_callback, this));
  }

  void timer_callback()
  {
    TRACE_EVENT(session);
    message_.data += 1;
    pub_1_->publish(message_);
    message_.data += 1;
    pub_2_->publish(message_);
  }

private:
  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_1_, pub_2_;
  rclcpp::TimerBase::SharedPtr timer_;
  std_msgs::msg::Int32 message_;
};

class ConsumerNode1 : public rclcpp::Node
{
public:
  ConsumerNode1() : Node("consumer_node1")
  {
    // custom_cb_ = create_callback_group(rclcpp::CallbackGroupType::Reentrant);

    // rclcpp::SubscriptionOptions options;
    // options.callback_group = custom_cb_;

    sub_2_ = create_subscription<std_msgs::msg::Int32>(
      "topic_2", 100, std::bind(&ConsumerNode1::cb_2, this, _1));
    sub_1_ = create_subscription<std_msgs::msg::Int32>(
      "topic_1", 100, std::bind(&ConsumerNode1::cb_1, this, _1));

    timer_ = create_wall_timer(
      10ms, std::bind(&ConsumerNode1::timer_callback, this));
  }

  void cb_1(const std_msgs::msg::Int32::SharedPtr msg)
  {
    TRACE_EVENT(session);

    waste_time(500us);
  }

  void cb_2(const std_msgs::msg::Int32::SharedPtr msg)
  {
    TRACE_EVENT(session);

    waste_time(500us);
  }

  void timer_callback()
  {
    TRACE_EVENT(session);

    waste_time(5ms);
  }

  void waste_time(const rclcpp::Duration & duration)
  {
    auto start = now();
    while (now() - start < duration);
  }

private:
  // rclcpp::CallbackGroup::SharedPtr custom_cb_;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_1_;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_2_;
  rclcpp::TimerBase::SharedPtr timer_;
};

class ConsumerNode2 : public rclcpp::Node
{
public:
  ConsumerNode2() : Node("consumer_node2")
  {
    // custom_cb_ = create_callback_group(rclcpp::CallbackGroupType::Reentrant);

    // rclcpp::SubscriptionOptions options;
    // options.callback_group = custom_cb_;

    sub_2_ = create_subscription<std_msgs::msg::Int32>(
      "topic_2", 100, std::bind(&ConsumerNode2::cb_2, this, _1));
    sub_1_ = create_subscription<std_msgs::msg::Int32>(
      "topic_1", 100, std::bind(&ConsumerNode2::cb_1, this, _1));

    timer_ = create_wall_timer(
      10ms, std::bind(&ConsumerNode2::timer_callback, this));
  }

  void cb_1(const std_msgs::msg::Int32::SharedPtr msg)
  {
    TRACE_EVENT(session);

    waste_time(500us);
  }

  void cb_2(const std_msgs::msg::Int32::SharedPtr msg)
  {
    TRACE_EVENT(session);

    waste_time(500us);
  }

  void timer_callback()
  {
    TRACE_EVENT(session);

    waste_time(5ms);
  }

  void waste_time(const rclcpp::Duration & duration)
  {
    auto start = now();
    while (now() - start < duration);
  }

private:
  // rclcpp::CallbackGroup::SharedPtr custom_cb_;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_1_;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_2_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);

  auto node_pub = std::make_shared<ProducerNode>();
  auto node_sub1 = std::make_shared<ConsumerNode1>();
  auto node_sub2 = std::make_shared<ConsumerNode2>();

  // rclcpp::executors::SingleThreadedExecutor executor;
  rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), 8);
  // rclcpp::experimental::executors::EventsExecutor executor;

  executor.add_node(node_pub);
  executor.add_node(node_sub1);
  executor.add_node(node_sub2);

  executor.spin();

  rclcpp::shutdown();
  return 0;
}
mjcarroll commented 1 month ago

I think it is not unreasonable for a user to want to use a MultiThreadedExecutor with MutuallyExclusive when he has several nodes, to parallelize both.

In this case, each node (and all corresponding internal entities) would end up with a separate callback group (by default, at least).

I believe this also makes a small difference in your first example and second example. In the first, it was multiple subscriptions on the same node (and callback group). In the second, you created two separate nodes, each having their own callback group, so re-entrant vs mutex has less of an impact.

There are two goals. By default, users shouldn't have to think about the threading safety, so all callbacks will be fired mutually exclusive. The second goal would be that adding many nodes to a single multi-threaded executor should cause many of the threads to be used, as each node would have it's own cbg, and not conflict with execution of other callbacks.

fmrico commented 1 month ago

I understand that you don't consider this a problem if the user knows it can suffer starvation and actively do something to avoid it. It would be good to write a warning anywhere in the documentation to warn the user to take the appropriate actions.

Anyway, I'd like to waste a few cycles of my boring youth exploring some alternatives :P

Thanks for your time @mjcarroll

mjcarroll commented 1 month ago

I understand that you don't consider this a problem if the user knows it can suffer starvation and actively do something to avoid it.

I do think it's probably surprising and warrants a notice or warning in the documentation.

Everything comes with a trade-off on effort vs reward, I suppose. The best case performance should come when each possible entity is either re-entrant or part of it's own callback group. The worst case performance should be when every entity is in a single mutually exclusive callback group.

Also, since the executor has a fixed priority order where timers are executed first, high rate timers can cause the starvation issue that you are discussing as well.

Since you are also in the process of evaluating, executors, you may also want to take a look at @jmachowinski's multithreaded events executor: https://discourse.ros.org/t/the-ros-2-c-executors/38296/21

fujitatomoya commented 1 month ago

@mjcarroll @fmrico

IMO, this result indicates the inefficiency MultiThreadedExecutor with MutuallyExclusive compared to SingleThreadedExecutror?

The behavior expected is more or less what happens with SingleThreadedExecution:

This result from 1st example. SingleThreadedExecutor is fast enough to take the both subscription data one by one except timer gets ready because the order to take data is 1st timer and then subscription.

Callback for /topic2 (cb2) suffers starvation because most of the time timer and cb1 has event and messages to process

with MultiThreadedExecutor, it looks like almost no topic_2 callback is called. i expect this that this is inefficiency for MultiThreadedExecutor, probably it takes some time to take the data compared to SignleThreadedExecutor, and when it tries to take data either timer or topic_1 (topic_1 is 1st one in the subscription map), and unlikely to reach to topic_2.

after all, i think this issue stands out as inefficiency of MultiThreadedExecutor? what do you think?

CC: @alsora @jmachowinski

mjcarroll commented 1 month ago

Yes, there is a cost to the bookkeeping that the multithreaded executor requires. Perhaps it can continue to be made better (that overhead can be reduced), but there is always going to be some additional cost over the single-threaded implementation.

We should generally be advising people to use the multithreaded executor only when it makes sense for their application. There isn't a general "one thread good, more threads better" recommendation in this case.

mjcarroll commented 1 month ago

I believe that we determined that even the single threaded executor can be starved with high enough frequency timer updates https://github.com/ros2/rclcpp/issues/392

alsora commented 1 month ago

I believe that we determined that even the single threaded executor can be starved with high enough frequency timer updates https://github.com/ros2/rclcpp/issues/392

I think that with waitset executors, the implicit ordering in which entities is checked (timers -> subscriptions -> services -> clients -> waitables from https://github.com/ros2/rclcpp/blob/rolling/rclcpp/src/rclcpp/executor.cpp#L754) means that every type of entity can starve the lower-priority ones.

Events-based executors address the problem by executing entities in the order they are triggered.

We should generally be advising people to use the multithreaded executor only when it makes sense for their application

Yes, when possible single-threaded executors should currently be preferred.

jmachowinski commented 1 month ago

For me, this is total expected behavior. This is the 'if a timer takes longer than it's period, only the timer is executed' problem, just a bit different.

Note that in the example shown, the subscriptions already take all available processing time. Therefore the timer will stall one callback because of the priority ordering.

jmachowinski commented 1 month ago

with MultiThreadedExecutor, it looks like almost no topic_2 callback is called. i expect this that this is inefficiency for MultiThreadedExecutor, probably it takes some time to take the data compared to SignleThreadedExecutor, and when it tries to take data either timer or topic_1 (topic_1 is 1st one in the subscription map), and unlikely to reach to topic_2.

The effect we are seeing here is the rebuild / masking of the waitsets. Single Threaded Executor:

Multi Threaded Executor:

This all comes down to the fact, that the list of ready entities is 'forgotten' as soon as a new waitset returns from wait. And exactly the reason why I programmed the CallbackGroupExecutor...

fujitatomoya commented 1 month ago

@jmachowinski thanks for the explanation.

and yes, this is really expensive.

https://github.com/ros2/rclcpp/blob/f8aea8cc51116ff27fe29fd170d4f23148e9f709/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp#L106

with understanding all the facts, what i mean is that this is really hard to see for user... i think that is the origin of this issue.

We should generally be advising people to use the multithreaded executor only when it makes sense for their application.

probably we should add documentation https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Executors.html about this? in general, MultiThreadedExecutor is recommended with CallbackGroupType::Reentrant?

fmrico commented 1 month ago

In general, MultiThreadedExecutor is recommended with CallbackGroupType::Reentrant?

IMHO, this is not a solid generalization because we could use a MultiThreadedExecutor with a node with two callback groups (all MutuallyExclusive) for an example like the following figures, in which callbacks do not execute concurrently, but the timer callback does. I think it could be enough to warn that with a MultiThreadedExecutor, it is not guaranteed that all callbacks in the same callback group will execute when there are always messages to process in more than one subscription.

executors_cbg_diagram_2 Figure_1

fmrico commented 1 month ago

Hi,

I have found a possible fix to this problem. The idea is to isolate from the order in which the subscriptions were created. The method WaitResult::next_ready_subscription iterates from 0 to find the first subscription with data to process, and it depends on this creation order. The workaround is starting this check each time with a different consecutive index, so the creation order is not relevant to select the next subscription to process.

I have implemented a draft of the solution here:

https://github.com/fmrico/rclcpp/blob/a228eb4c458a8cc93252003411707691c419e653/rclcpp/include/rclcpp/wait_result.hpp#L209-L229

There is probably a better implementation, but the idea seems to work.

Before this change

before

After this change

after

jmachowinski commented 1 month ago

While working for this special problem, your fix would not work for the timer starvation problem. If you increase the waste time of the timer in your example to 11ms, you will see a total starvation in case of the MultithreadedExecutor, vs a 'round robin' scheduling in the Single threaded executor.

Try this executor : https://github.com/cellumation/cm_executors It is multi threaded, and should work as expected.

mjcarroll commented 1 month ago

There is probably a better implementation, but the idea seems to work.

We had previously discussed having the execution order of ready entities be managed by some sort of policy. That way users could specify which items to prioritize. I think that the idea wasn't followed through on because it's a difficult space to generalize for all applications.

Depending on the level of control and the layout of the application, I would recommend trying the cellumation executor (or at least coming to hear about it at ROSCon).

Alternatively, if you are looking for deep control of the scheduling semnatics, you could also directly use the rclcpp::WaitSet object and skip using the executor altogether. This gives maximum control over the runtime threading model.

HarunTeper commented 3 weeks ago

Hello,

I had a pull request for this issue last year around this time (https://github.com/ros2/rclcpp/pull/2360), and have since proposed an executor design that prevents starvation in the multi-threaded executor:

You can read about it here

https://ieeexplore.ieee.org/document/9622336

https://daes.cs.tu-dortmund.de/storages/daes-cs/r/publications/teper2024emsoft_preprint.pdf

I fixed this issue using an additional mutex lock, which prevents callbacks from being removed from blocked callback groups. Although this may introduce large latencies for callbacks of such groups, it prevents callbacks from being completely starved, a property that I think is very useful to natively support the execution of "most" systems.

If needed, I can modify the current executor design and implement the changes I proposed in the paper. This would guarantee that no callback is starved due to mutually exclusive callback groups.

fujitatomoya commented 3 weeks ago

I had a pull request for this issue last year around this time (https://github.com/ros2/rclcpp/pull/2360), and have since proposed an executor design that prevents starvation in the multi-threaded executor:

i do not think that https://github.com/ros2/rclcpp/pull/2360 can fix the problem here. i may be mistaken but can you elaborate a bit on https://github.com/ros2/rclcpp/pull/2360 i left the comment there.

HarunTeper commented 3 weeks ago

My previous pull request does not include the fix that is proposed in the work that I published, as I completely reworked the mechanism to prevent starvation. However, when I opened that pull request, I was looking if this issue is worth fixing.

I am planning to either update my previous pull request or create a pull request with my newest executor version. Which option do you prefer?

jmachowinski commented 3 weeks ago

New PR