max0x7ba / atomic_queue

C++ lockless queue.
MIT License
1.51k stars 180 forks source link

The queue does not accept NIL value. #23

Closed ghost closed 4 years ago

ghost commented 4 years ago

Hello @max0x7ba

I have a shared fixed-size buffer whose element's data type is a big size record. Instead of store elements of buffer directly to the queue directly, I just store indices of the buffer. One issue I have with the queue is, push() and try_push() do not accept a zero value which is the first index of the buffer. I want to know if there is any way to make the queue accept a zero value.

Thanks

max0x7ba commented 4 years ago

Queues for atomic elements require one element value to be reserved for NIL. By default it uses a value initialized element (0 for fundamental types). You can specify a different value for NIL as the template argument. E.g. if your element type is unsigned, you can specify static_cast<unsigned>(-1) as NIL value, as long as your indexes are from a limited range [0, N], where N < static_cast<unsigned>(-1).

Another way is to post pointers and never post nullptr.

Alternatively, the queue versions for non-atomic elements do not require reserving an element value.

ghost commented 4 years ago

Your advise is really helpful. Thank you. Actually I have just tried your lib some days ago. I did not know the NIL value can be customized.

By the way here, I have a question does not relate to your lib, but I think you may give me an idea. So far I often use two queues to do the producer-consumer problem, one plays as a pool and the other plays as a product queue. The code is like below:

  `constexpr` auto SIZE = 10;
atomic_queue::AtomicQueue<int, SIZE, -1> qu_producer, indices_pool;
struct BigRecord {
    int data;
};
BigRecord sharedBuffer[SIZE]{};

for (auto i = 0; i < SIZE; ++i)
    indices_pool.push(i);

std::thread producer([&] {
    while (bAppRunning) {
        int item {};
        if (indices_pool.try_pop(item)) {
                            // fill data
            sharedBuffer[item].data= {rand()};
            fmtLog::trace("producer: {}", sharedBuffer[item].data);
            qu_producer.push(item);
        }
    }
    });

std::thread consumer([&] {
    while (bAppRunning) {
        int item {};
        if (qu_producer.try_pop(item)) {
            fmtLog::trace("consumed data: {}", sharedBuffer[item].data);
            indices_pool.try_push(item);
        }
    }
    });

` The struct BigRecord here is just an example, in fact it may contain more complex data items. My question is How can I avoid using the additional indices_pool but still able to use the queue as a "atomic queue" with cheap "push" (without copying big data)?. Thanks.

max0x7ba commented 4 years ago

If you only have 1 producer and 1 consumer thread, then a single-producer-single-consumer ring-buffer like boost::spsc_queue but with separate commit functions is the most efficient solution for your problem. It is wait-free and 0-copy. E.g.:

#include <atomic>
#include <thread>

struct BigRecord { int data; };

class ProducerConsumerQueue {
    std::atomic<unsigned> free_{0};
    std::atomic<unsigned> pend_{0};

    static int constexpr SIZE = 1024;
    BigRecord buffer_[SIZE];

public:
    BigRecord* back() noexcept {
        auto beg = pend_.load(std::memory_order_acquire);
        auto end = free_.load(std::memory_order_relaxed);
        return (end - beg < SIZE) ? buffer_ + (end % SIZE) : 0;
    }

    void push_back() {
        free_.fetch_add(1, std::memory_order_release);
    }

    BigRecord* front() noexcept {
        auto beg = pend_.load(std::memory_order_relaxed);
        auto end = free_.load(std::memory_order_acquire);
        return beg != end ? buffer_ + (beg % SIZE) : 0;
    }

    void pop_front() {
        pend_.fetch_add(1, std::memory_order_release);
    }
};

int main() {
    ProducerConsumerQueue q;
    constexpr int N = 32 * 1024 * 1024;

    std::thread producer([&] {
       for(int i = N; i >= 0;) {
            if(BigRecord* r = q.back()) {
                r->data = i--;
                q.push_back();
            }
        }
    });

    std::thread consumer([&] {
        for(bool stop = false; !stop;) {
            if(BigRecord* r = q.front()) {
                stop = !r->data;
                q.pop_front();
            }
        }
    });

    producer.join();
    consumer.join();
}
ghost commented 4 years ago

Thanks for your detailed answer. Your example seems to be working, but my queue, however, needs one producer but allows multiple consumers to access concurrently. Regardless of that, there are still some issues with your solution.The first is your queue has no mechanism to maintain the emptiness or fullness of the queue. Second, since the producer and consumer operate in parallel without checking fullness and emptiness, at one point, two ones may overlap the head/end of each other. Additionally, I have just detected one issue with your atomic_queue, I will post it on another thread to avoid confusion for other people.

max0x7ba commented 4 years ago

In the future, you may like provide full details of your problem and code samples that corresponds to your use case. Not everyone can read your mind.