max0x7ba / atomic_queue

C++ lockless queue.
MIT License
1.51k stars 180 forks source link

Create a queue with object ptrs as queue elements. #49

Closed Yashavanth-CP closed 1 year ago

Yashavanth-CP commented 1 year ago

I am trying to create a queue with Class Object pointers as elements of Queue for my exploration and taken sample code from src/example.c. The example.c uses uint32_t and also -1 as the special value that cannot be pushed/popped and what could be it for queue of pointers of objects ? For eg:

class file :
class Room {
    public:
        double length;
        double breadth;
        double height;

        double calculateArea(){
            return length * breadth;
        }

        double calculateVolume(){
            return length * breadth * height;
        }

};

main file : 
#include "atomic_queue/atomic_queue.h"

#include "Element.h"
#include <thread>
#include <cstdint>
#include <chrono>
#include <iostream>

int main() {
    using namespace std::chrono_literals;
    int constexpr PRODUCERS = 1; // Number of producer threads.
    int constexpr CONSUMERS = 1; // Number of consumer threads.
    unsigned constexpr N = 500; // Each producer pushes this many elements into the queue.
    unsigned constexpr CAPACITY = 400; // Queue capacity. Since there are more consumers than producers the queue doesn't need to be large.

    using Element = Room *; // Queue element type
    Element constexpr NILELEMENT = reinterpret_cast<Element>(0xdeaddead);
//    static constexpr Element invalid1 = reinterpret_cast<Element>(0xdeadead);
    //Element constexpr NILELEMENT = static_cast<Element>(&room); // Atomic elements require a special value that cannot be pushed/popped.
    using Queue = atomic_queue::AtomicQueueB<Element, std::allocator<Element>, NILELEMENT>; // Use heap-allocated buffer.

    // Create a queue object shared between all producers and consumers.
    Queue q{CAPACITY};

    // Start the consumers.
    uint64_t sums[CONSUMERS];
    std::thread consumers[CONSUMERS];
    for(int i = 0; i < CONSUMERS; ++i)
        consumers[i] = std::thread([&q, &sum = sums[i]]() {
            uint64_t s = 0; // New object with automatic storage duration. Not aliased or false-shared by construction
        auto n = q.pop();
            while(n != NILELEMENT) { // Break the loop when 0 is pop'ed. 
                free(n);
        n = q.pop();
        s++;
        std::cout << " Consuming here " << n << " " << s << std::endl << std::flush;
        std::this_thread::sleep_for(20ms);
        }
            // Store into sum only once because it is element of sums array, false-sharing the same cache line with other threads.
            // Updating sum in the loop above saturates the inter-core bus with cache coherence protocol messages.
            std::cout << " Done with Consumer " << std::endl << std::flush;
            sum = s;
        });

    // Start the producers.
    std::thread producers[PRODUCERS];
    for(int i = 0; i < PRODUCERS; ++i)
        producers[i] = std::thread([&q, &NILELEMENT]() {
            // Each producer pushes range [1, N] elements into the queue.
            // Ascending order [1, N] requires comparing with N at each loop iteration. Ascending order isn't necessary here.
            // Push elements in descending order, range [N, 1] with step -1, so that CPU decrement instruction sets zero/equal flag
            // when 0 is reached, which breaks the loop without having to compare n with N at each iteration.
            for(auto n = N; n > 0; --n) {
                q.push(new Room());
        }
        q.push(NILELEMENT);
        std::cout << "Done with pushing " << std::endl << std::flush;
        });

    // Wait till producers have terminated.
    for(auto& t : producers)
        t.join();

    // Tell consumers to terminate by pushing one 0 element for each consumer.
    for(int i = CONSUMERS; i--;)
        q.push(nullptr);
    // Wait till consumers have terminated.
    for(auto& t : consumers)
        t.join();
    // When all consumers have terminated the queue is empty.

    // Sum up consumer's received elements sums.
    uint64_t total_sum = 0;
    for(auto& sum : sums) {
        total_sum += sum;
        if(!sum) // Verify that each consumer received at least one element.
            std::cerr << "WARNING: consumer " << (&sum - sums) << " received no elements.\n";
    else
        std::cerr << "CONSUMER CONSUMED ALL ELEMENTS \n";
    }

    // Verify that each element has been pop'ed exactly once; not corrupted, dropped or duplicated.
    uint64_t constexpr expected_total_sum =  N * PRODUCERS;
    if(int64_t total_sum_diff = total_sum - expected_total_sum) {
        std::cerr << "ERROR: unexpected total_sum difference " << total_sum_diff << '\n';
        return EXIT_FAILURE;
    }

    std::cout << "Successfully completed " << std::endl;
    return EXIT_SUCCESS;
}

I am unable to get this compiled,

   src/example.cc: In function ‘int main()’:
   src/example.cc:18:36: error: ‘reinterpret_cast’ from integer to pointer
   18 |     Element constexpr NILELEMENT = reinterpret_cast<Element>(0xdeaddead);

I have also tried with static_cast and nothing works.

I expect this generic queue to support any datatype (pointer). Any help is appreciated to get this compiled..

Thanks in advance.

max0x7ba commented 1 year ago

The error you observe

src/example.cc:18:36: error: ‘reinterpret_cast’ from integer to pointer

which is better spelled out by clang:

note: reinterpret_cast is not allowed in a constant expression

is due to the fact reinterpret_cast is not allowed in a constant expression, see point 18 in https://en.cppreference.com/w/cpp/language/constant_expression


The default NIL value is T{}, which is 0/null pointer for pointer types. Null pointer NIL value is standard and reasonable for most applications, so that there is no need to explicitly specify NIL value.

The example code uses a queue of integeres and defines a different non-zero NIL for demonstration purposes and its own conveniece. In other words, it demonstrates what's possible, but not necessary.

Only in cases when one would like to push null pointers into the queue, the default 0/null NIL must be set to something else.