cameron314 / concurrentqueue

A fast multi-producer, multi-consumer lock-free concurrent queue for C++11
Other
10k stars 1.69k forks source link

Elements cannot be consumed in time #396

Closed mengzhisy closed 2 months ago

mengzhisy commented 2 months ago

This is a great project, and it has been in use in our production environment for some time. However, we recently discovered an issue where the behavior of the queue is not quite consistent with my initial understanding.

int main() {
    struct Element {
        bool print = true;
        int64_t val;
    };
    moodycamel::ConcurrentQueue<Element> q;
    static std::mutex m; // lock for std::cout
    std::vector<std::thread> threads;
    for (int t = 0; t < 4; t++) {
        threads.emplace_back([&]() {
            while (true) {
                Element v;
                if (q.try_dequeue(v)) {
                    if (v.print) {
                        std::lock_guard<std::mutex> g(m);
                        std::cout << "consumed: " << v.val << std::endl;
                        v.print = false;
                    }

                    // Some tasks will be handled here
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));

                    // If needed, the element will be re-enqueued here.
                    // I found that if this happens, the element will always cut in line,
                    // causing other elements in the queue to not be consumed in time.
                    q.enqueue(v);
                }
            }
        });
    }

    for (int i = 0; i < 32; i++) {
        q.enqueue(Element{true, i});
    }

    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::lock_guard<std::mutex> g(m);
        std::cout << "waiting..." <<  std::endl;
    }
}
consumed: 0
consumed: 1
consumed: 2
consumed: 3
consumed: 5
consumed: 4
consumed: 6
consumed: 7
consumed: 9
consumed: 8
waiting...
waiting...
waiting...
waiting...
consumed: 10
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...
waiting...

As shown in the output above, elements 10 to 32 cannot be consumed in time. I don't have strict requirements on the execution order, but I don't want elements to wait in the queue for too long without being consumed. Do you have any suggested solutions?

cameron314 commented 2 months ago

There is no fairness guaranteed. What's happening here is that the consumer threads are each reenqueueing to an implicit producer queue specific to their thread. These implicit inner queues happen to be polled for elements before the one from main, in perpetuity.

Using explicit consumer tokens will improve fairness, but again, nothing is guaranteed.

mengzhisy commented 2 months ago

Using explicit consumer tokens will improve fairness, but again, nothing is guaranteed.

Great, this solved my problem. Thank you very much!