eclipse-iceoryx / iceoryx

Eclipse iceoryx™ - true zero-copy inter-process-communication
https://iceoryx.io
Apache License 2.0
1.57k stars 373 forks source link

Some questions about IndexQueue. #2297

Open Xcliu opened 1 month ago

Xcliu commented 1 month ago

Brief description

In my application, I've used the lock-free queue in iceoryx and noticed that the push operation is taking approximately 2 seconds to complete in some cases. Upon reviewing the lock-free queue's implementation, I've observed that the pop function within the IndexQueue could potentially lead to an infinite loop scenario when multiple threads are attempting to pop items simultaneously without any threads pushing new items. Am I correct in this assessment, or could there be other factors at play that I might have overlooked?

I would greatly appreciate any advice or insights on this matter.

Detailed information

https://github.com/eclipse-iceoryx/iceoryx/blob/cb2608a77f362d86e705269ae6bf6ba3340e6ddf/iceoryx_hoofs/concurrent/buffer/include/iox/detail/mpmc_lockfree_queue/mpmc_index_queue.inl#L139

elfenpiff commented 1 month ago

@Xcliu

Am I correct in this assessment, or could there be other factors at play that I might have overlooked?

No, you are correct, and it is valid behavior for lock-free algorithms. The guarantee that a lock-free algorithm gives you is that at least one thread always exists that makes progress. The much harder-to-implement, wait-free algorithm comes with a stronger guarantee that every thread always makes progress. (See: https://en.wikipedia.org/wiki/Non-blocking_algorithm)

This means that, in practice, when you have many producers that add data to the index queue and one consumer that acquires the data from the queue, it could happen that the producing threads always win and make progress, and the consumer may end up in a busy starvation loop.

One solution to this exemplary pop-starvation-problem could be to prioritize the pop-thread over the push-threads.

Would you be able to explain your problem in a bit more detail, maybe with some pseudo code? We also have other constructs in iceoryx available that may help you in your particular setup and we could recommend when we would know a bit more about the context.

Xcliu commented 1 month ago

@elfenpiff Thanks for your reply!

It appears that if three threads attempt to pop from an MPMCIndexQueue and no threads are pushing to it, only one of the pop threads will make progress, while the remaining two will block until new data is pushed to the MPMCIndexQueue. Is this understanding correct?

In the pub/sub mode of Iceoryx, each subscriber has a dedicated receiving buffer (LockFreeQueue). The publisher will push messages to its history queue when no subscribers are present, and will only push (pushing to LockFreeQueue corresponds to popping from MPMCIndexQueue) to the receiving buffer when a subscriber is available. This approach may reduce the likelihood of blocking.

In my use case, we have the following scenario:

iox::concurrent::ResizeableLockFreeQueue<T, kMaxCapacity> buffer;

// Approximately 10 producer threads, with one of them having a high priority (e.g., RR-19).
// The high-priority thread and one of the ordinary threads are blocked for about 2 seconds.
buffer.push(item);

// There is only one consumer thread.
T item;
buffer.pop(item);
elfenpiff commented 1 month ago

@Xcliu

It appears that if three threads attempt to pop from an MPMCIndexQueue and no threads are pushing to it, only one of the pop threads will make progress, while the remaining two will block until new data is pushed to the MPMCIndexQueue. Is this understanding correct?

No, in this case all threads should run without any obstructions and return false.

// Approximately 10 producer threads, with one of them having a high priority (e.g., RR-19). // The high-priority thread and one of the ordinary threads are blocked for about 2 seconds.

I think that from the queue point of view this shouldn't happen. Do you have other synchronization primitives like a mutex or other lock-free/threadsafe constructs? I am asking since this seems like a priority inversion problem - it can also originate from the index queue, I just wanted to exclude other possibilities first.

Another explanation I could think of is that ordinary producer threads send with an extremely high frequency, and even though their priority is lower, the number of publishers prevents the high-prio publisher from being scheduled.

Also, this architecture would require that the consumer thread has at least the priority of the high-priority producer thread. Otherwise, the queue would be constantly full since the consumer is too rarely scheduled. To reduce contention, you could also try to split up the queue a bit and use one queue per producer-consumer pair, in your case 10. Here you could use the spsc_fifo On the consumer side you would always iterate over all fifos and see who has something new to consume.

Xcliu commented 1 month ago

@elfenpiff

When the first CAS failed, the value of cellIsValidToRead appears to remain constant (readPosition is not updated) in the loop unless a push is executed, which updates the the value of value.getCycle()?

template <uint64_t Capacity, typename ValueType>
bool IndexQueue<Capacity, ValueType>::pop(ValueType& index) noexcept
{
    bool ownershipGained = false;
    Index value;
    auto readPosition = m_readPosition.load(std::memory_order_relaxed);
    do
    {
        value = loadvalueAt(readPosition, std::memory_order_relaxed);

        auto cellIsValidToRead = readPosition.getCycle() == value.getCycle();

        if (cellIsValidToRead)
        {
            Index newReadPosition(readPosition + 1U);
            ownershipGained = m_readPosition.compare_exchange_weak(
                readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed);
        }
        else
        {
            // readPosition is ahead by one cycle, queue was empty at value load
            auto isEmpty = value.isOneCycleBehind(readPosition);

            if (isEmpty)
            {
                return false;
            }

            readPosition = m_readPosition.load(std::memory_order_relaxed);
        }

        // readPosition is outdated, retry operation

    } while (!ownershipGained); // we leave if we gain ownership of readPosition

    index = value.getIndex();
    return true;
}
elBoberido commented 1 month ago

@Xcliu I currently don't have the full code in front of me but from the snippet you posted, readPosition is updated each time the CAS fails

Xcliu commented 1 month ago

@elfenpiff When cellIsValidToRead is true, the subsequent compare_exchange_weak operation might fail. The line "readPosition = m_readPosition.load(std::memory_order_relaxed);" will not execute. Which line will update "readPosition"?

elfenpiff commented 1 month ago

@Xcliu

When cellIsValidToRead is true, the subsequent compare_exchange_weak operation might fail. The line "readPosition = m_readPosition.load(std::memory_order_relaxed);" will not execute. Which line will update "readPosition"?

This line updates readPosition

ownershipGained = m_readPosition.compare_exchange_weak(
  readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed); 

When compare_exchange_weak fails the current_read_position is updated with the value contained in m_readPosition, see: https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange

There it is described as:

bool compare_exchange_weak( T& expected, T desired,
                            [std::memory_order](http://en.cppreference.com/w/cpp/atomic/memory_order) success,
                            [std::memory_order](http://en.cppreference.com/w/cpp/atomic/memory_order) failure ) noexcept;

Atomically compares the object representation(until C++20)value representation(since C++20) of this with that of expected. If those are bitwise-equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in this into expected (performs load operation).

Xcliu commented 1 month ago

@elfenpiff Thanks for your clarification! I understand the point you're making.

Xcliu commented 1 month ago

@elfenpiff @elBoberido

Something about the LockFreeQueue

If the m_freeIndices are exhausted and the item isn't added to m_usedIndices to make it full ( blocked by higher priority thread) in another push thread (T1),will the current push thread be blocked as a result of thread T1 being obstructed by a higher priority thread?

template <typename ElementType, uint64_t Capacity>
template <typename T>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::pushImpl(T&& value) noexcept
{
    cxx::optional<ElementType> evictedValue;

    BufferIndex index;

    while (!m_freeIndices.pop(index))
    {
        // only pop the index if the queue is still full
        // note, this leads to issues if an index is lost
        // (only possible due to an application crash)
        // then the queue can never be full and we may never leave if no one calls a concurrent pop
        // A quick remedy is not to use a conditional pop such as popIfFull here, but a normal one.
        // However, then it can happen that due to a concurrent pop it was not really necessary to
        // evict a value (i.e. we may needlessly lose values in rare cases)
        // Whether there is another acceptable solution needs to be explored.
        if (m_usedIndices.popIfFull(index))
        {
            evictedValue = readBufferAt(index);
            break;
        }
        // if m_usedIndices was not full we try again (m_freeIndices should contain an index in this case)
        // note that it is theoretically possible to be unsuccessful indefinitely
        // (and thus we would have an infinite loop)
        // but this requires a timing of concurrent pushes and pops which is exceptionally unlikely in practice
    }

    // if we removed from a full queue via popIfFull it might not be full anymore when a concurrent pop occurs

    writeBufferAt(index, value); //&& version is called due to explicit conversion via std::move

    m_usedIndices.push(index);

    return evictedValue; // value was moved into the queue, if a value was evicted to do so return it
}
elBoberido commented 1 month ago

@Xcliu I'm not that familiar with the LockFreeQueue class but I think this comment is related to #1546