cameron314 / concurrentqueue

A fast multi-producer, multi-consumer lock-free concurrent queue for C++11
Other
9.53k stars 1.66k forks source link

BlockingConcurrentQueue in mutil-threads #301

Closed devinzhang91 closed 2 years ago

devinzhang91 commented 2 years ago

Iuse two BlockingConcurrentQueue(q1,q2) to link producer ,workers, consumer. The model like this: 1 producer -> q1 -> 4 workers(consumer/producer) -> q2 -> 1 consumer But at the end of all thread is joined, q2 is not empty, I'm not sure if I'm using the wrong case. I use -1 in place of t the end signal. The end signal will shut down thread when workers /consumer received it. This code may need to be run several times before the error occurs.

void test1(){
    ///1 producer + 4 workers(consumer/producer) + 1 consumer
    ///32 blocks queue
    ///producer -> q1 -> workers -> q2 -> consumer
    moodycamel::BlockingConcurrentQueue<int> q1, q2;
    int n_thread=4;
    ///to count how many worker has received end signal
    std::atomic_int flag_cnt(n_thread);
    ///single producer
    std::thread producer_thread([&]{
        for (int i = 0; i != 10; ++i) {
            while(false == q1.try_enqueue(i)){
                printf("q1.try_enqueue:%3d fail.\n",i);
            };
        }
        ///add 4 EndFlag (4 workers)
        int end_flags[] = { -1,-1,-1,-1 };
        while(false == q1.try_enqueue_bulk(end_flags, n_thread));
    });

    ///mutli-consumer
    std::vector< std::thread > vec_worker_threads;
    for (int i = 0; i < 4; ++i) {
        vec_worker_threads.emplace_back(std::thread([&,i]{
            int item;
            ///loop dequeue q1 and enqueue q2, until get EndFlag
            while(1){
                q1.wait_dequeue(item);
                if(item==-1){
                    /// -1 is end of signal
                    printf("i: %d,q1 get -1\n",i);
                    break;
                }
                printf("i: %d,q1:%3d\n",i,item);
                while(false == q2.try_enqueue(item)){
                    printf("i: %d,q2.try_enqueue:%3d fail.\n",i,item);
                };
            }
            ///add EndFlag if flag_cnt==0, indicate q1 is empty.
            int flag = --flag_cnt;
            printf("i: %d,flag: %d\n",i,flag);
            if(0==flag){
                while(false == q2.enqueue(-1)){
                    printf("i: %d,q2.try_enqueue:%3d fail.\n",i,-1);
                };
                printf("i: %d,End of q1\n",i);
            }
        }));
    }

    ///single consumer
    std::thread consumer_thread([&]{
        int item;
        ///loop dequeue q2 until get EndFlag
        while(1){
            q2.wait_dequeue(item);
            if(item==-1){
                printf("End of q2\n");
                break;
            }
            printf("q2:%3d\n", item);
        }
    });
    ///join all thread
    producer_thread.join();
    for (int i = 0; i < 4; ++i) {
        vec_worker_threads[i].join();
    }
    consumer_thread.join();
    ///check q1 and q2 whether is empty.
    if(0 != q1.size_approx()){ printf("[error]: q1 is not empty:%3ld\n", q1.size_approx()); };
    if(0 != q2.size_approx()){ printf("[error]: q2 is not empty:%3ld\n", q2.size_approx()); };
    ///dump the last data in q2
    while(0!=q2.size_approx()){
        int item;
        q2.wait_dequeue(item);
        printf("q2: last data: %3d\n", item);
    }
}
cameron314 commented 2 years ago

See the README. The final order between items that are enqueued by different threads/producers is not deterministic. Here, only the items enqueued to q2 by the same worker thread that enqueues -1 are guaranteed to be processed by the consumer thread.

To fix this example, consider having each worker thread enqueue -1 to q2 when done, and move the flag_cnt logic into the consumer thread.

devinzhang91 commented 2 years ago

See the README. The final order between items that are enqueued by different threads/producers is not deterministic. Here, only the items enqueued to q2 by the same worker thread that enqueues -1 are guaranteed to be processed by the consumer thread.

To fix this example, consider having each worker thread enqueue -1 to q2 when done, and move the flag_cnt logic into the consumer thread.

Thank you for your reply , flag_cnt has moved in q2 end detecttion, it works correctly.