userver-framework / userver

Production-ready C++ Asynchronous Framework with rich functionality
https://userver.tech
Apache License 2.0
2.36k stars 272 forks source link

SpscQueue race condition bug #578

Closed akhoroshev closed 3 months ago

akhoroshev commented 3 months ago

Воспроизведение:

#include <userver/concurrent/queue.hpp>
#include <userver/utest/utest.hpp>
#include <userver/utils/async.hpp>

#include <iostream>

using namespace userver;

UTEST_MT(Queue, RaceCondition, 16) {
  struct Foo {};

  using ResponseQueue = userver::concurrent::SpscQueue<Foo>;

  auto queue = ResponseQueue::Create();
  auto consumer = queue->GetConsumer();
  auto producer = queue->GetProducer();

  auto background_routine = userver::engine::AsyncNoSpan([consumer = std::move(consumer)]() mutable {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::move(consumer).Reset();
    std::cout << "consumer reseted\n";
  });

  if (producer.Push(Foo{})) {
    std::cout << "first push successed\n";
  } else {
    std::cout << "first push failed\n";
  }

  if (producer.Push(Foo{})) {
    std::cout << "second push successed\n";
  } else {
    std::cout << "second push failed\n";
  }

  background_routine.Get();
}

Чтобы лучше воспроизводилось надо "пропатчиить" это место (суть патча добавить sleep чтобы разломать инварианты)

diff --git a/core/include/userver/concurrent/queue.hpp b/core/include/userver/concurrent/queue.hpp
index 97de7533a..706ba2ba4 100644
--- a/core/include/userver/concurrent/queue.hpp
+++ b/core/include/userver/concurrent/queue.hpp
@@ -404,6 +404,18 @@ class GenericQueue<T, QueuePolicy>::SingleProducerSide final {

     used_capacity_.fetch_add(value_size);
     queue_.DoPush(token, std::move(value));
+
+    // Проблема в этом месте. Reset может сделать сброс "следующего" события.
+    //
+    // Пусть начали делать вставку в очередь и одновременно удалять единственный Consumer
+    // Во время начала вставки консьюемер еще был жив (NoMoreConsumers == false)
+    // Успешно вставляем элемент (queue_.DoPush)
+    // Consumer удаляется, при удалении он выставляет event в деструкторе (queue_->MarkConsumerIsDead())
+    // А теперь мы сбрасываем event (non_full_event_.Reset())
+    //
+    // Итого на выходе вернем true, и клиенты опять будут обращаться к Producer и блокироватья навсегда.
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+
     non_full_event_.Reset();
     return true;
   }

По итогу сниппет кода виснет намертво с таким stdout:

[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from Queue
[ RUN      ] Queue.RaceCondition
consumer reseted
first push successed
akhoroshev commented 3 months ago

Вообще есть еще одна претензия к DoPush

https://github.com/userver-framework/userver/blob/develop/core/include/userver/concurrent/queue.hpp#L401 и https://github.com/userver-framework/userver/blob/develop/core/include/userver/concurrent/queue.hpp#L405

не стоило делать одной транзакцией? Если два треда провалятся в DoPush одновременно в итоге может более totalcapacity элементов оказаться в очереди

Anton3 commented 3 months ago

не стоило делать одной транзакцией? Если два треда провалятся в DoPush одновременно в итоге может более totalcapacity элементов оказаться в очереди

"Это не баг, это фича", длина очереди может незначительно превышать лимит под нагрузкой, как и описано в доке