ApolloAuto / apollo

An open autonomous driving platform
Apache License 2.0
25.23k stars 9.72k forks source link

cyber base Bounded Queue deadlock #13159

Open Tao-Fu opened 3 years ago

Tao-Fu commented 3 years ago

Describe the bug When used with BlockWaitStrategy and BoundedQueue's waitEnqueue is never waked up when the queue is full. The following test code is blocked forever.

TEST(BoundedQueueTest, BlockWaitStrategyNeverStop) {
  BoundedQueue<int> queue;
  queue.Init(2, new BlockWaitStrategy());
  queue.Enqueue(1);
  queue.Enqueue(1);
  std::cout <<"--------" << std::endl;
   std::thread t([&]() {
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    int value;
    queue.WaitDequeue(&value);
    queue.WaitDequeue(&value);
    std::cout << "Thread: " << queue.Size() << std::endl;
  });
  std::cout << "Main " << queue.Size() << std::endl;
  queue.WaitEnqueue(1);
  t.join();
}

To Reproduce Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior

Screenshots If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

Smartphone (please complete the following information):

Additional context Add any other context about the problem here.

daohu527 commented 3 years ago

reason

When the queue is full and used with BlockWaitStrategy, the BoundedQueue's waitEnqueue will never waked up. Beacuse when the queue is full it will going to wait_strategy_->EmptyWait(), but Dequeue don't have a notify, then it will wait for the condition_variable forever.

template <typename T>
bool BoundedQueue<T>::WaitEnqueue(const T& element) {
  while (!break_all_wait_) {
    if (Enqueue(element)) {           \\ when queue is full , then failed
      return true;
    }
    if (wait_strategy_->EmptyWait()) {       \\ wait for the condition_variable
      continue;
    }
    // wait timeout
    break;
  }

  return false;
}

solution

Add notifyonce in Dequeue as the Enqueue does.

template <typename T>
bool BoundedQueue<T>::Dequeue(T* element) {
  uint64_t new_head = 0;
  uint64_t old_head = head_.load(std::memory_order_acquire);
  do {
    new_head = old_head + 1;
    if (new_head == commit_.load(std::memory_order_acquire)) {
      return false;
    }
    *element = pool_[GetIndex(new_head)];
  } while (!head_.compare_exchange_weak(old_head, new_head,
                                        std::memory_order_acq_rel,
                                        std::memory_order_relaxed));
  wait_strategy_->NotifyOne();          // ---------------- add notify
  return true;
}
daohu527 commented 3 years ago

There will be a little problem add wait_strategy_->NotifyOne() to Dequeue. There will be an invalid notify

  1. we use WaitDequeue and the queue is empty, then we wait the condition.
  2. an Enqueue come and will add an object to the queue and then notify one WaitDequeue
  3. then WaitDequeue will do Dequeue and success, then it will call NotifyOne again, but the queue is already empty.