cameron314 / readerwriterqueue

A fast single-producer, single-consumer lock-free queue for C++
Other
3.72k stars 662 forks source link

Feature request: wait_enqueue for BlockingReaderWriterQueue #112

Closed kcgen closed 3 years ago

kcgen commented 3 years ago

The BlockingReaderWriterQueue doesn't block the producer when the queue is full, which leads to unbounded queue growth.

Unbounded queue growth is almost universally undesirable because it leads to runaway usage of resources:

And ideal system:

Currently the BlockingReaderWriterQueue meets the first (throttling the consumer), but has no mechanism to throttle the producer, so currently some form of band-aid code is needed to deal with the second.

It would be fantastic if BlockingReaderWriterQueue has a wait_enqueue as an available function. Here's some sample code to illustrate the problem:

g++ test.cpp -pthread && ./a.out

#include "readerwriterqueue.h"
#include <thread>
#include <stdio.h>

using namespace moodycamel;

constexpr auto MAX_QUEUE_DEPTH = 8;
BlockingReaderWriterQueue<int, MAX_QUEUE_DEPTH> q(MAX_QUEUE_DEPTH);

int create_next_item()
{
    static int i = 1;
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    return i++;
}

void operate_on_item(const int &item)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main()
{
    printf("Running single-producer, single-consumer with a %d queue depth\n",
           MAX_QUEUE_DEPTH);

    std::thread reader([&]() {
        int item;
        while (true) {
            q.wait_dequeue(item);
            operate_on_item(item);
            printf("Operated on%3d, %3lu items queued\n", item,
                   q.size_approx());
        }
    });

    std::thread writer([&]() {
        while (true) {
            const auto item = create_next_item();
            q.enqueue(item);
        }
    });

    writer.join();
    reader.join();

    assert(q.size_approx() == 0);

    return 0;
}
kcgen commented 3 years ago

I'd like to use this queue in DOSBox Staging to let us easily thread audio handling. To handle the audio, we have the producer, such as:

We only want these producers to generate at most ~30 ms of audio (which is a couple video frames) of latency. If they run ahead more than that, then the audio will de-sync from the video frames. So this is the need for wait_enqueue. :-)

The consumer is the playback side that consumes 44100 or 48000 frames/second (at a steady pace) .

cameron314 commented 3 years ago

Adding wait_enqueue would slow down all operations (2x the overhead of going from ReaderWriterQueue to BlockingReaderWriterQueue because of the second semaphore manipulation required on every operation).

It sounds like your needs might be better met with a circular ring-buffer?

cameron314 commented 3 years ago

Actually, try_enqueue already fails if there's no memory available -- so if you presize the queue appropriately and only use try_enqueue when producing, the queue effectively acts as a circular buffer.

kcgen commented 3 years ago

A ring buffer won't block the producer.

Adding wait_enqueue would slow down all operations (2x the overhead of going from ReaderWriterQueue

The cost of full queue-driven blocking are going to be borne somewhere, and I'd rather it be done transparently by an expert in this area, inside your class instead of as an external and ugly (and likely buggy) producer throttle feature.

Perhaps three classes could be offered:

try_enqueue already fails if there's no memory available -- so if you presize the queue appropriately

This would either involve endlessly spinning and burning one thread at 100% CPU usage (try enqueue.. If fail, try again, ...). Or doing something significantly slower than a semaphor notify solution, like: try enqueue? If fail Chrono::sleep(microsecond), try again ...

Do you think it's possible to implement a wait_enqeue function?

cameron314 commented 3 years ago

All right, I understand your point. It's a valid use case. However, I don't feel like it makes complete sense to add a blocking wait_enqueue method to a resizable queue, even as an option.

Instead, I propose introducing a new BlockingCircularBuffer data structure which offers what you need. I'll see what I can do. :-)

kcgen commented 3 years ago

@cameron314 , that's fantastic to hear. Thank you!

It's a valid use case.

I will agree and (just to chat more about it), will say that this new blocking circular queue is the most applicable to real-world use-cases, as unbounded resource growth (on either the producer side or consumer side) is almost always harmful; which this new class will beautifully take care of.

Can't wait to put it into action!

cameron314 commented 3 years ago

I've added a new readerwritercircularbuffer.h header in the blocking-circular-buffer branch. Logic is simple, I believe it should be correct barring any silly typo-like mistakes in my implementation. I did add some simple tests (passing) but no deep stress tests yet. Haven't tested on multiple platforms, etc. Also no benchmarks.

I'd appreciate if you could try it out and let me know if you run into any bugs or performance issues.

kcgen commented 3 years ago

Thanks for such a fast turnaround @cameron314 ! With a gift like this, 2021 is looking like a damn good year already 😅 I will do as much testing as I know how in my use case; but I'm not a threading+queue guru (hence using a best in class implementation instead). Hopefully can rely on your performance and corner-case testing as the most definitive and reliable source to prove those aspects.

kcgen commented 3 years ago

Works as expected!

Here's a simple demonstration using the new readerwritercircularbuffer.h where the goal is keep a fixed-size queue filled, while making the consuming thread wait until items are available and similarly making the producing thread wait until an empty slot is available in the queue.

The demo uses an 8-slot queue with the producer being only 25% faster than the consumer. Initially the producer works unbounded producing four items for every three consumed, until after producing 30 items it's created a backlog of 8 items in the queue, at which point the producer is throttled and only "released" to work after the consumer takes an item off the queue - ensuring the queue doesn't grow unbounded.

g++ queue.cpp -pthread && ./a.out

queue.cpp.gz

Running single-producer, single-consumer with an 8-item queue depth
Produced id-1
Produced id-2
Consumed id-1 (1 items queued)
Produced id-3
Consumed id-2 (1 items queued)
Produced id-4
Consumed id-3 (1 items queued)
Produced id-5
Produced id-6
Consumed id-4 (2 items queued)
Produced id-7
Consumed id-5 (2 items queued)
Produced id-8
Consumed id-6 (2 items queued)
Produced id-9
Produced id-10
Consumed id-7 (3 items queued)
Produced id-11
Consumed id-8 (3 items queued)
Produced id-12
Consumed id-9 (3 items queued)
Produced id-13
Produced id-14
Consumed id-10 (4 items queued)
Produced id-15
Consumed id-11 (4 items queued)
Produced id-16
Consumed id-12 (4 items queued)
Produced id-17
Produced id-18
Consumed id-13 (5 items queued)
Produced id-19
Consumed id-14 (5 items queued)
Produced id-20
Consumed id-15 (5 items queued)
Produced id-21
Produced id-22
Consumed id-16 (6 items queued)
Produced id-23
Consumed id-17 (6 items queued)
Produced id-24
Consumed id-18 (6 items queued)
Produced id-25
Produced id-26
Consumed id-19 (7 items queued)
Produced id-27
Consumed id-20 (7 items queued)
Produced id-28
Consumed id-21 (7 items queued)
Produced id-29
Produced id-30
Consumed id-22 (8 items queued)
Produced id-31
Consumed id-23 (8 items queued)
Produced id-32
Consumed id-24 (8 items queued)
Produced id-33
Consumed id-25 (8 items queued)
Produced id-34
Consumed id-26 (8 items queued)
Produced id-35
Consumed id-27 (8 items queued)
Produced id-36
Consumed id-28 (8 items queued)
Produced id-37
Consumed id-29 (8 items queued)
Produced id-38
Consumed id-30 (8 items queued)
Produced id-39
Consumed id-31 (8 items queued)
Produced id-40
cameron314 commented 3 years ago

Thanks. I just pushed an update to the internal benchmarks too :-)

kcgen commented 3 years ago

Test cases have been ported to our Google Test CI suite (currently on a branch). These unit tests run on every push, and our CI is gated based on their results.

https://github.com/dosbox-staging/dosbox-staging/blob/kc/mt32-circular-buffer-1/tests/readerwritercircularbuffer.cpp

2021-01-02_17-47

kcgen commented 3 years ago

Thanks. I just pushed an update to the internal benchmarks too :-)

Excellent! Are there comparable queue's with similar two-way blocking action in boost, TBB, and/or Folly to benchmark against?

If so, would be curious of the relative comparison, if you have some initial results (also, no worries if still tuning, etc). 👍

cameron314 commented 3 years ago

Possibly! I'm not really aware of the other implementations. SPSC circular buffers are pretty easy to find but blocking versions are rare as far as I know.

I don't have any other tricks up my sleeve, performance wise, for this particular implementation. I think leaving the code as simple as possible is better in this case. I'll update the readme sometime in the next few weeks and then merge into master.

cameron314 commented 3 years ago

For what it's worth, here's the benchmark results on my Ryzen 2400G (mingw-64 g++). Look at BRWCB vs RWQ or Folly (Folly being the closest equivalent since it's also a circular buffer, albeit a non-blocking one):

                  |----------------  Min -----------------|----------------- Max -----------------|----------------- Avg -----------------|
Benchmark         |   RWQ   |  BRWCB  |  SPSC   |  Folly  |   RWQ   |  BRWCB  |  SPSC   |  Folly  |   RWQ   |  BRWCB  |  SPSC   |  Folly  | xSPSC | xFolly
------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+-------
Raw add           | 0.0002s | 0.0014s | 0.0016s | 0.0003s | 0.0003s | 0.0014s | 0.0023s | 0.0003s | 0.0003s | 0.0014s | 0.0019s | 0.0003s | 7.47x | 1.19x
Raw remove        | 0.0003s | 0.0014s | 0.0010s | 0.0002s | 0.0003s | 0.0015s | 0.0025s | 0.0002s | 0.0003s | 0.0014s | 0.0020s | 0.0002s | 7.75x | 0.98x
Raw empty remove  | 0.0026s | 0.0012s | 0.0023s | 0.0027s | 0.0037s | 0.0013s | 0.0025s | 0.0030s | 0.0032s | 0.0012s | 0.0024s | 0.0029s | 0.75x | 0.91x
Single-threaded   | 0.0033s | 0.0037s | 0.0055s | 0.0031s | 0.0034s | 0.0041s | 0.0060s | 0.0033s | 0.0034s | 0.0039s | 0.0058s | 0.0032s | 1.72x | 0.96x
Mostly add        | 0.0042s | 0.0189s | 0.0350s | 0.0058s | 0.0051s | 0.0195s | 0.0359s | 0.0062s | 0.0047s | 0.0194s | 0.0355s | 0.0061s | 7.61x | 1.30x
Mostly remove     | 0.0034s | 0.0034s | 0.0032s | 0.0031s | 0.0038s | 0.0038s | 0.0038s | 0.0033s | 0.0036s | 0.0036s | 0.0036s | 0.0032s | 0.98x | 0.87x
Heavy concurrent  | 0.0037s | 0.0193s | 0.0286s | 0.0070s | 0.0044s | 0.0257s | 0.0301s | 0.0097s | 0.0040s | 0.0225s | 0.0296s | 0.0089s | 7.49x | 2.26x
Random concurrent | 0.0126s | 0.0124s | 0.0129s | 0.0114s | 0.0136s | 0.0136s | 0.0140s | 0.0129s | 0.0132s | 0.0132s | 0.0136s | 0.0124s | 1.04x | 0.94x

Average ops/s:
    ReaderWriterQueue:                  255.36 million
    BlockingReaderWriterCircularBuffer: 237.72 million
    SPSC queue:                         149.34 million
    Folly queue:                        249.62 million
kcgen commented 3 years ago

@cameron314 ,

Good news - I've used BlockingReaderWriterCircularBuffer to replace a significant amount of user-side book-keeping and coordination, making for much easier to read and maintain code (here). Thank you!

We build with -Wall -Weffc++ CPPFLAGS, which reported a handful of uninitialized members (details follow):

../libs/readerwriterqueue/atomicops.h:499:18: 
warning: ‘moodycamel::spsc_sema::Semaphore::m_sema’ should be initialized in the member
 initialization list [-Weffc++]
  499 |       AE_NO_TSAN Semaphore(int initialCount = 0)
      |                  ^~~~~~~~~

../libs/readerwriterqueue/atomicops.h:628:18:
warning: ‘moodycamel::spsc_sema::LightweightSemaphore::m_sema’ should be initialized in
the member initialization list [-Weffc++]
  628 |       AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
      |                  ^~~~~~~~~~~~~~~~~~~~

../libs/readerwriterqueue/readerwritercircularbuffer.h:34:11:
warning: ‘moodycamel::BlockingReaderWriterCircularBuffer<std::array<short int, 768> >::mask’
 should be initialized in the member initialization list [-Weffc++]
   34 |  explicit BlockingReaderWriterCircularBuffer(std::size_t capacity)
      |           ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

../libs/readerwriterqueue/readerwritercircularbuffer.h:34:11:
warning: ‘moodycamel::BlockingReaderWriterCircularBuffer<std::array<short int, 768> >::rawData’
should be initialized in the member initialization list [-Weffc++]

../libs/readerwriterqueue/readerwritercircularbuffer.h:34:11:
warning: ‘moodycamel::BlockingReaderWriterCircularBuffer<std::array<short int, 768> >::data’
should be initialized in the member initialization list [-Weffc++]

Is that something you can check on your side? (I'm using GCC 10.x). Alternatively, happy to update and submit a PR!

cameron314 commented 3 years ago

Not a fan of this warning... it seems to be complaining about implicit initialization, forcing the code to be needlessly verbose or even slower (if initialization is not required) just to silence it. Nevertheless, the impact is minimal, so I've adjusted the code to avoid this warning.

kcgen commented 3 years ago

the impact is minimal, so I've adjusted the code to avoid this warning.

Thanks for this, @cameron314! Builds are running clean.

The effc++ warnings are generally helpful or improve readability, but yeah - sometimes can be a bit excessive.

forcing the code to be needlessly verbose

I agree, especially when the initial value is spread across multiple constructors. A decent option is to assign in the class definition, which brings the variable type, name, and initial value all together in one place:

    void* m_hSema = nullptr;

(As an example); but definitely nicer than spread out as you've mentioned, and this also avoids repeating assignment in multiple constructors.

kcgen commented 3 years ago

More real world testing, and the new queue is working flawlessly - thanks again @cameron314! This issue is now functionally closed for me, but will let you close it as a tie-in when moving it to the stable branch.

aaalgo commented 3 years ago

Thank you very much to make this available. I'm glad that I did the extra digging to find the blocking circular buffer branch here. I initially replaced my own implementation (which I'd prefer not to maintain) with https://github.com/Balnian/ChannelsCPP whose capacity is a compile time constant. I dropped in your circular buffer version and it's working nicely.

kcgen commented 3 years ago

@cameron314 - we've put the BlockingReaderWriterQueue queue into action in DOSBox Staging handling both our MT-32 audio and FluisSynth (MIDI) audio producer and consumer threads, with the queue managing a fixed number of audio frames to control latency -- all without any mutex/conditional/locking up on our user code.

cameron314 commented 3 years ago

The blocking circular buffer has been merged into master :-)