Open gavv opened 4 months ago
I can work on this!
Thank you
I was looking through your draft PR and realized that semaphore does not allow us to implement concurrent calls to wait_state() easily.
If 2 threads call wait_state(), they both should block, and each one is unblocked independently (when state becomes what it's waiting for). Posting a semaphore once would obviously unlock only one thread.
One possible approach is to keep track how many waiters are there and post semaphore N times. I'm not sure if it's possible to implement this in race-free and lock-free way, but if it's possible, I think it'd be a good solution.
Another possible approach is a bit more fancy, but I think is workable. The signaling thread (one that updates state) has the same implementation as currently, only waiting thread changes. Now waiting looks like this:
First, we lock a mutex, shared between all waiting threads (but not signaling thread, to keep it lock-free).
After locking a mutex, we check if the semaphore is free.
If semaphore was free, we mark it as non-free, unlock mutex, and sleep on semaphore. When semaphore is triggered, we check if the updated state matches mask and either exit or repeat the loop. In addition, we trigger a condition variable (using broadcast() method).
If semaphore was not free, it means that another thread is already waiting on semaphore and is guaranteed to trigger the condition variable. So we're blocking on condition variable. When it's triggered, we check if the updated state matches mask and either exit or repeat the loop.
In other words, only one thread wait on the semaphore, and if there are multiple concurrent waiting threads, others are waiting on condition variable, and the one that waits on the semaphore notifies them.
This way mutex and condition variable are used only on waiting side, and the signaling side remains lock-free.
The downside is a more tricky implementation and a performance hit when there are more than 1 waiters, because of additional scheduling delays. Though the most important path to optimize is when there is only 1 waiter, so that's not very critical.
Please let me know your thoughts and whether do you have any better ideas than these two.
Some additional background
The reason why we must support concurrent calls is that wait_state() would be called from roc_sender_poll() and roc_receiver_poll() - functions exposed via C API. Our API is fully thread-safe and concurrent, and we allow multi-thread usage for all calls. Though most typical used case would be when there is only one concurrent call, other use cases are possible too.
It is also the reason why we must not use mutex (and thus condition variable) from signaling thread. Signaling thread is the thread that perform real-time I/O. Waiting, on the other hand, may be invoked by user from any low-priority thread, and we should make sure that user calls would never block real-time thread.
Hi @gavv thanks for the feedback. Haven't pushed them yet but made some changes that align more with the first approach (calling sem_post N times based on the number of waiters).
Currently I'm using an array of semaphores and an array of waiter counters, where the size of each array is the number of states, and each semaphore/counter corresponds with a given state. I wasn't sure how to selectively signal some waiting threads and not signal others based on which state they were waiting on without using multiple semaphores (and counters). Just wanted to check if the multiple semaphores/counters idea made sense, or if there was a simpler way with a single semaphore you had in mind in the first approach you mentioned.
I don't get it, if you have one semaphore per state, how do you wait for a few states simultaneously? For example if the mask is PAUSED|IDLE
, which means that we should wait until we enter any of these 2 states.
Maybe I'm missing something?
Actually, since state changes are quire rare, I think we could allow spurious wakeups to simplify implementation. If state is changed and there are waiter(s), no matter what exact state(s) they're waiting, we trigger semaphore. When the waiting thread wakes up, it checks if the state matches the mask. If not, it repeats waiting.
Background
StateTracker is a small class that holds counters of active sessions and pending packets and computes current pipeline state based on that counters: if there are sessions or packets, the state is ACTIVE, otherwise the state is IDLE.
StateTracker may be used from both realtime and non-realtime threads, so its methods should not block each other to avoid priority inversion problems. It's achieved by using atomics.
Task
In addition to
get_state()
, which just reads atomic counters and returns state (ACTIVE or IDLE), we now need to add new methodbool wait_state(unsigned state_mask, core::nanoseconds_t deadline)
. We need it for #677.This method should block until the state becomes any of the states specified by the mask, or deadline expires. E.g. if mask is
ACTIVE | PAUSED
, it should block until state becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more states will be needed). Deadline should be an absolute timestamp.We can't use a mutex can condition variable, because when
wait_state()
is called from non-realtime thread, it should never block (even shortly) other methods of the StateTracker, as they are called from realtime threads.We can implement it using semaphore + atomics the same way as we do it in core::Timer. The idea is pretty simple: wait_state() blocks on a semaphore in a loop and repeats the loop until condition is met. Methods that update counters trigger the semaphore (which is a lock-free operation).
To reduce spurious wakeups, those methods should also avoid triggering semaphore in two cases: when they can be sure that nobody is waiting for it, and when they can be sure that the new state is not the one which is being waiting. This can be achieved with additional atomics.
Concurrent calls to wait_state() should be allowed too.
Testing
StateTracker should be covered with unit tests (currently it doesn't have them). Tests should be added here. Example of tests for blocking multithreaded code can be found here.