max0x7ba / atomic_queue

C++ lockless queue.
MIT License
1.51k stars 180 forks source link

Ever incresing queue size #44

Closed ZEPORATH closed 2 years ago

ZEPORATH commented 2 years ago

Hi, I am trying to use the queue in my project, where opencv frames are queued. I needed to store a queue size of 5(for most recent frames), but what I observe is that if the consumer thread is not activated, and the producer will keep pushing the data in queue which leads to the size of queue ever increasing. At the rate of 30fps, my queue size is crossing 1gb in just few min, instead of few mb(s) for just 5 frames.

Code snippets are as below:

  1. Declaration: atomic_queue::AtomicQueueB2<FramePacket> m_LiveFrameBuff{5};
  2. Producer call:
      auto temp = framePacket;
      m_LiveFrameBuff.push(temp);
  3. Consumer call:
    void FrameBuffManager::getLiveFrame(FramePacket &ret)
    {
    funcname();
    if (m_stopworkers.load())
        return;
    ret = m_LiveFrameBuff.pop();
    return;
    }
  4. Size print check
    std::cout << m_LiveFrameBuff.was_size() << std::endl;

The readme also mentions about other available queues like OptimistAtomicQueue2, but I am unable to find/use it. Can more simpler use case examples be provided for the respective available containers. It will be really helpful.

ZEPORATH commented 2 years ago

Actually I get it, the capacity is 4096 elements, so now once this is full, the code will busy wait until a consumer starts emptying it. I was looking for some kind of FIFO buffer where a fixed size can be maintained and if the this HWM is hit the old data is thrown and new data is pushed in, like if the buffer can have some sort of discard policy. Forgot basics of queue, Anyway, Can I change the capacity to a lower value, if yes how?

max0x7ba commented 2 years ago

I was looking for some kind of FIFO buffer where a fixed size can be maintained and if the this HWM is hit the old data is thrown and new data is pushed in, like if the buffer can have some sort of discard policy.

Queues' only job is to store the elements, not discard them.

Rolling windows, on the other hand, are designed to keep the last N elements and discard the oldest, according to your policy.

You should note that push is designed for the use case that the queue is never full. And pop for the use case that the queue is never empty. Otherwise, you should use try_ calls. It is safe to mix try_push with pop and push with try_pop. As well as push with try_push and pop with try_pop, but this would be less trivial code, which makes it harder to reason about and ensure it works as expected.

You can implement rolling window functionality in multiple ways. Solutions depend on whether the number of producers is more than 1, and whether the number of consumers is more than 1.

For single-producer-single-consumer (SPSC) use case:

  1. The producer invokes try_push. If that succeeds, you're done.
  2. Otherwise, the producer does try_pop to extract the oldest element. Should that fail that means that the queue has become empty - the consumer managed to drain it since step 1. Regardless, the queue now has spare capacity for at least 1 new element.
  3. The producer does push which is bound to succeed immediately now.

Note, that there is an inherent race condition between steps 1 and 2: try_push fails in the producer; pop/try_pop in the consumer succeeds and the queue is not full any more; the producer does try_pop to discard the oldest element after the consumer has already done so. Such race condition are inherent with lockless queues because there is no external lock to ensure that a series of calls (e.g. try_push followed by pop followed by push) executes in atomic fashion.

Anyway, Can I change the capacity to a lower value, if yes how?

You can achieve the desired effect of keeping the queue size below N+1 elements by making your producer discard the oldest elements, and face the race condition I mention above.

The queue capacity cannot be changed. It is a design trade-off to achieve the lowest possible latency of the push and pop calls, as well as between two of them.


For your problem of one prolific producer and one slow consumer that is only interested in the last N elements, I would start with a simple std::list<T> along with std::mutex and std::condition_variable. Both producer and consumer lock the mutex before modifying the list. This way the producer can discard the oldest element to sure that the list never grows beyond N elements on push. The condition variable only needs to be signalled when push changes the rolling window state from empty to non-empty. And then benchmark to see whether such a queue is the bottleneck in your application.

These lockless queues are designed for the use case when blocking queues become the bottleneck in your application, e.g. multiple producers and/or multiple consumers contending on the mutex to modify the queue. Your use case of one producer doing 30 push/sec and one consumer not being being able to keep up is rather pedestrian, you shouldn't need anything beyond basic std:: components for that. They are easy to use and hard to misuse relative to lockless solutions. E.g.:

template<class T>
class LastN {
    std::list<T> queue_, free_;
    std::mutex m_;
    std::condition_variable c_;

public:
    LastN(std::size_t size_max) 
        : free_(size_max) // Allocate all list nodes, T must be default constructable.
    {}

    template<class... Args>
    void push(Args&&... args) {
        std::lock_guard<std::mutex> l(m_);
        auto const queue_empty = queue_.empty();
        if(free_.empty()) // The queue grew above the limit, reuse the oldest list node for the new element.
            free_.splice(free_.end(), queue_, queue_.begin());
        auto free_node = free_.begin();
        *free_node = T(std::forward<Args>(args)...); // Move-construct T in the new node.
        queue_.splice(queue_.end(), free_, free_node);
        if(queue_empty) // The queue was empty, wake up a popper now.
            c_.notify_one();
    }

    T pop() {
        std::unique_lock<std::mutex> l(m_);
        while(queue_.empty())
            c_.wait(l);
        free_.splice(free_.end(), queue_. queue_.begin());
        return std::move(free_.back()); // Move T into the return value.
    }
};

This is a basic well performing sketch with the main idea of minimizing the duration of the mutex being locked / maximizing scalability by avoiding any memory allocations in push and pop. These functions only lock the mutex for the duration of moving a list node from one list to another and move-constructing T, which had better be cheap.