cameron314 / concurrentqueue

A fast multi-producer, multi-consumer lock-free concurrent queue for C++11
Other
9.52k stars 1.66k forks source link

Add wait_dequeue_timed_from_producer #348

Closed Keynib closed 1 year ago

Keynib commented 1 year ago

It will be useful to have the function like:

    template<typename U>
    inline bool wait_dequeue_timed_from_producer(producer_token_t& token, U& item, std::int64_t timeout_usecs)
    {
        if (!sema->wait(timeout_usecs)) {
            return false;
        }
        while (!inner.try_dequeue_from_producer(token, item)) {
            continue;
        }
        return true;
    }

I think there are can be situations where some consumers are waiting messages from one producer specified by token, However, the default API doesn't contains such function.

P.S. It is relates to other wait methods, they can use producer_token_t as an input parameter also.

cameron314 commented 1 year ago

It's not possible to implement these functions because the semaphore is for all producers, not a particular one.

Keynib commented 1 year ago

@cameron314 Wait, I suppose you didn't understand me correctly, or I didn't understand something properly, let me describe one more time. The documentation contains the following part:

If you happen to know which producer you want to consume from (e.g. in a single-producer, multi-consumer scenario), you can use the try_dequeue_from_producer methods, which accept a producer token instead of a consumer token, and cut some overhead.

Note that tokens work with the blocking version of the queue too.

This is exactly my use case: I have only one producer and many consumers, which are waiting new messages from the queue. When producer enqueue the message, one of the consumers wakes up, dequeue the message and process it.

So, I suppose that I can use wait method in the combination with producer token to improve the performance. This new function is based on wait_dequeue_timed and should be called from consumers:

    template<typename U>
    inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
    {
        if (!sema->wait(timeout_usecs)) {
            return false;
        }
        while (!inner.try_dequeue(item)) {
            continue;
        }
        return true;
    }

I assume there is nothing bad to call inner.try_dequeue_from_producer instead of inner.try_dequeue(item).

Could you please check one more time and tell is it possible to use such function? Also, I have tested it and looks like it works well. Otherwise, could you describe the function which should I use instead in my case?

cameron314 commented 1 year ago

Those two statements in the README are rather unfortunately juxtaposed; they are unrelated to each other.

Tokens work with the blocking version of the queue: you can enqueue using a producer token, and dequeue using a consumer token. But you can't dequeue using a producer token (i.e. dequeue from a specific producer) in the blocking version, because there's no per-producer semaphore.

Your original implementation works only when there's a single producer. Otherwise the caller can end up in an infinite loop while simultaneously preventing other consumers from noticing an element from another producer.

cameron314 commented 1 year ago

Consider this: Alice and Bob each have a bucket. Eve throws Alice a fish and announces to Charlie (whose back is turned) that's the boat has caught one fish so far. Now Charlie knows there's a fish in one of the buckets, but not which one. However, Charlie is friends with Bob, and doesn't care about Alice. So Charlie tells Eve that's he's going to take that fish, thank you very much, and asks Bob to hand it over. But his hand is outstretched for a long time, because Bob does not, in fact, have a fish yet.

Keynib commented 1 year ago

@cameron314 Thank you for the verbose response, there is no more questions from my side.