luxonis / XLink

A cross-platform library for communicating with devices over various physical links.
Apache License 2.0
12 stars 18 forks source link

Resolves a possible race to streamDesc_t entry #32

Closed themarpe closed 2 years ago

themarpe commented 2 years ago

An attempt/comment on some discussion from #29

After semaphore is posted, the memset to 0 could occur (but not yet the stream.id = INVALID_STREAM_ID), which the blocked thread waiting at eg getStreamById wouldn't notice and continue returning the actual streamDesc_t*, which is actually already "closed"

This changes try to address that by setting the stream id to INVALID_STREAM_ID before posting the semaphore and not changing it afterwards. Also the blocked thread in getStreamById rechecks the actual ID afterwards to see if it was closed in the mean time it was waiting for the semaphore.

CC: @diablodale

diablodale commented 2 years ago

In the PR merged into develop, INVALID_STREAM_ID is set while within semaphore protection. Your scenario in OP doesn't match that code. Would you please clarify?

https://github.com/luxonis/XLink/blob/8665883c79c196c4720421479bf7aa74a0f1ca47/src/shared/XLinkDispatcherImpl.c#L488-L492

diablodale commented 2 years ago

Are you exploring thread-safety for connections and streams? If that is your goal...then I refer back to my previous discussion/post about partnering with Intel to rearchitect the thread-safety because there are deep rooted issues. The quick mutex you added in PR #30 fixes only a small subset of the issues. This is why I continue to suggest a rule/doc of "only one thread can operate on a stream simultaneously".

When code calls getStreamById() there is only protection for another thread "getting" the same thread at the same time. So to protect that narrow set of scenarios, there should be a call to XLink_sem_wait(&link->availableStreams[stream].sem) before mutating/writing to that array entry. A multiread+singlewrite mutex would add more protection. Yet when increasing protection this way, it cascades code in many places which, I think, starts to be the same amount of effort as rearchitecting/fixing the core thread-safety issues.

https://github.com/luxonis/XLink/blob/8665883c79c196c4720421479bf7aa74a0f1ca47/src/shared/XLinkDispatcherImpl.c#L458-L466 is just one example of some threading issues. That example doesn't crash -- instead later logic finally catches the bad scenario and cascades back errors (these can be seen in xlink debug/trace level logging).

themarpe commented 2 years ago

Are you exploring thread-safety for connections and streams?

No, don't want to dive too deep.

In the PR merged into develop, INVALID_STREAM_ID is set while within semaphore protection. Your scenario in OP doesn't match that code. Would you please clarify?

The main intention is to just cover that interaction between XLinkResetRemoteTimeout and any pending events. Scenario in mind is the following:

So what the following wants to avoid is:

  1. After sem is posted in ResetTimeout T2, the T1 should check if the stream was "invalidated", thats the reason for additional check
  2. Bringing memset under sem_wait/sem_post to not cause a scenario where:
    • T2 posts sem, memsets the streamId = 0 (gets rescheduled, before setting INVALID_ID again)
    • T1 unblocks, checks for INVALID_ID, but as its not invalid, but rather 0, it returns the streamDesc_t* but the whole struct is zeroed out and using it might cause issues
diablodale commented 2 years ago

I've been working on my response. Trying to simplify my response, but it is so complex and long. Please give me 24 hrs to work/reduce it more...

diablodale commented 2 years ago

Yes... and... πŸ˜‚

I recommend doing this PR plus the following...

  1. Add to both getStreamById() and getStreamByName() a check for the return of XLink_sem_wait() and return NULL if error e.g. XLINK_RET_ERR_IF(XLink_sem_wait(...), NULL);
  2. Both getStreamById() and getStreamByName() need to do that XLINK_RET_ERR_IF(XLink_sem_wait(...), NULL); before they check any field of availableStreams[stream] such as id or name. Because another thread could alter those fields while the thread in these functions is comparing those fields without semaphore protection
  3. Remove or deprecate the function XLinkStreamReset() to prevent unsafe usage.
  4. Remove my one line XLINK_RET_ERR_IF(id == INVALID_STREAM_ID, NULL); on line 68 of getStreamById() in XLinkPrivateFields.c. It no longer has good value.
  5. Investigate line XLinkDispatcherImpl.c line 479. That line if (!stream) { will always be false. This code is useless...or...not doing what a dev intended. Why? Because link is a valid pointer, and points to a fully allocated xLinkDesc_t struct. And the member of that struct availableStreams[32] is a fully allocated array streamDesc_t availableStreams[XLINK_MAX_STREAMS]. Not streamDesc_t* availableStreams[XLINK_MAX_STREAMS]. It can never be a null pointer. More useful would be to delete lines 479-480 and put one line of code after the XLink_sem_wait() like if (stream->id == INVALID_STREAM_ID)) {XLink_sem_post(&stream->sem); continue;}

Notes

I believe this scenario is an example of the many problems caused by the core issues that the XLink arch is not thread-safe. Since the base of XLink is not thread-safe, it leads to all these scattered boundary cases. And fixing one boundary case ripples code behavior; often exposing another problem. Your PR + my recommendations is probably not worse. Yet, I do not have high confidence this is measurably "better". πŸ€·β€β™€οΈ

themarpe commented 2 years ago

Pushed the new changes.

Both getStreamById() and getStreamByName() need to do that XLINK_RET_ERR_IF(XLink_sem_wait(...), NULL); before they check any field of availableStreams[stream] such as id or name. Because another thread could alter those fields while the thread in these functions is comparing those fields without semaphore protection

This change will most likely affect other things as it will eat up one semaphore notification on every stream.

Investigate line XLinkDispatcherImpl.c line 479...

Removed. Yeah, not sure - maybe they meant to do the getStreamById initially or something like that where it may return a null... Otherwise added a check for INVALID_STREAM_ID beforehand, as otherwise doing operations on non initialized semaphore would be UB.

Your PR + my recommendations is probably not worse. Yet, I do not have high confidence this is measurably "better".

I agree - the use of (recently added) XLinkResetRemoteTimeout is breaching a bit into the land where dispatcher thread is the only "actor" a bit as well.. If it'll cause any issues I'll revisit that, but this changes seem in place eitherway.

(Addition - just tested, it seems that previous sem_wait had an issue in this very regard - as it cycled through all streams even those that weren't created, it was waiting at the semaphore indefinitely (the Scheduler threads piled up on consecutive reconnects). So having a check for it beforehand fixes UB in old behavior because of sem_destroy and in new changes properly skips the uninitialized streams. Seems like the idea really was to get a "valid stream" or null otherwise, with that 478 line, to be able to skip over if null)

themarpe commented 2 years ago

It seems like the closeLink is access separately by the new XLinkResetRemoteTimeout OR DispatcherWaitEventComplete.... The DispatcherWaitEventComplete... however seems to call the dispatcherReset only if the event semaphore or waiting on it fails.

I think XLinkResetRemoteTimeout is the outlier here as the rest seems to be happening in scheduler thread only (stream accesses, etc...)

PR converation from the issue of incorrect/bug XLink_Semaphore. TL;DR...xlink is improperly using a semaphore with streams when it should be using a mutex. A separate issue on this is pending.

Not sure if related to above mentioned point, let me know.

Anyway, maybe best would be to revert back to behavior where XLinkResetRemoteTimeout doesn't closeLink by itself put adds an event (as usual) but in timeout case, unblocks the scheduler instead so it can clean itself up in its own thread. If that fails, maybe a similar mechanism to DispatcherWaitEventComplete... is used and as a last resort

id = getSem(pthread_self(), curr);
if (id == NULL || XLink_sem_wait(id)) {
    dispatcherReset(curr);
}

is called.

themarpe commented 2 years ago

Removed external thread access in XLinkResetRemoteTimeout to closeLink. It now just unblocks the scheduler and lets the scheduler clean up on its own.

I request we we separate/isolate so to resolve this specific PR scenario without expanding work. Yes...rearchitecting xlink thread safety for connections+streams could include a mutex for streams. I would prefer not to discuss that here as I have a separate long issue of bugs I've found, code examples, etc. That is in the realm of rearchitecting and would cascade work to other places outside the scope of this PR.

I agree, I'll be merging this as the scope is now reduced to only fixing the scheduler cleanup - lets move the rest of the aspects in a separate issue/PR

diablodale commented 2 years ago

This is an interesting shift in approach. Perhaps related to my earlier forced close() experience https://github.com/luxonis/depthai-core/issues/257#issuecomment-974183371

I updated some code comments. I left all my previous comments on dispatcherCloseLink() because of the below. I do not understand all of the dispatcher/event pump, so I could be wrong on the following...πŸ€·β€β™€οΈ

My attempt to understand event pump πŸ‘‘

This PR's new approach wants to change close/shutdown behavior so that only one thread mutates the streams, stream packets, and their structs at any one moment in time.

XLinkInitialize() or XLinkConnect() call DispatcherStart() that creates and detaches a thread to run eventSchedulerRun(). In eventSchedulerRun() another thread is spawned to run eventReader() in a while loop until it receives XLINK_RESET_REQ event and sets curr->resetXLink = 1.

When resetXLink=1 and the loop ends, the thread is joined back to code in eventSchedulerRun() and calls little "d" dispatcherReset(). dispatcherReset() locks mutex reset_mutex and calls function pointer glControlFunc->closeLink which is actually dispatcherCloseLink().

Then reset_mutex is enforcing the rule that only one thread at a moment in time can call dispatcherReset() or dispatcherDeviceFdDown(). dispatcherReset() is the only thing that calls dispatcherCloseLink(). So that means only one thread can be running dispatcherCloseLink() at the same time.

This PR now calls dispatcherDeviceFdDown() in some scenarios which eventually closes the OS-level USB/tcpip connection. I ?suspect? somehere it will cause an error to read from USB and will cascade to eventually end the eventReader() thread leading to dispatcherReset().

It is possible for the main app thread to call dispatcherReset() through chains like: XLinkConnection::close() calls XLinkResetRemoteTimeout() calls DispatcherWaitEventCompleteTimeout() calls dispatcherReset(). Only reset_mutex protects dispatcherReset(). That means that only reset_mutex protects dispatcherCloseLink() line 480 if(stream->id == INVALID_STREAM_ID) {.

Questions (based on my incomplete understanding of event pump)

  1. It is unclear to me how this works when XLinkDispatcher.c checks and restarts the while loop on hardware errors before it checks for an XLINK_RESET_REQ event. Is this a race condition if the hardware fails/closed before the XLINK_RESET_REQ is processed? Since the event loop is on one thread... and the main app is on another thread... is it possible to create that race condition since APIs like XLinkConnection::close() calls XLinkResetRemoteTimeout() calls DispatcherWaitEventCompleteTimeout() calls dispatcherReset(). In that condition, it is true that only one thread calls dispatcherReset(). Yet, would one thread be stuck looping between lines 599-610 of eventReader() in XLinkDispatcher.c due to the hardware failure and never reach the test for XLINK_RESET_REQ event?

  2. stream->id = INVALID_STREAM_ID; appears three times in code. Two of those times is handling events XLINK_CLOSE_STREAM_REQ and XLINK_CLOSE_STREAM_RESP. That implies that the event thread runs that stream->id = INVALID_STREAM_ID; code. Before id is set, each event code calls getStreamById() and therefore it holds the semaphore for that stream. BUT... there is no protection as it relates to dispatcherCloseLink() line 480. These two events (processed by the event thread) can set id=INVALID_STREAM_ID at the same time as the main app thread calls XLinkConnection::close() calls XLinkResetRemoteTimeout() calls DispatcherWaitEventCompleteTimeout() calls dispatcherReset() calls dispatcherCloseLink() line 480. I still believe my two code comments apply: move INVALID_STREAM_ID test inside the protection, and to always check the result code of XLink_sem_wait().

  3. And while I was investigating question 2, I saw that those two events set stream->id = INVALID_STREAM_ID; and then calls releaseStream(stream);. But releaseStream() will fail because the event code set id=INVALID_STREAM_ID. Is this a problem? The semaphore will never be incremented back to 1, therefore the stream struct slot will never again be usable; forever be lost/leaked.

  4. To remove (or not add) protection to getStreamById() and getStreamByName() requires that only one thread (typically the single event pump thread) calls those APIs. When I search the codebase, that is not 100% true. The public api XLinkGetFillLevel() calls getStreamById(). This is not thread-safe. I don't see any caller of this function. Perhaps delete or deprecate it?

themarpe commented 2 years ago

It is possible for the main app thread to call dispatcherReset() through chains like: XLinkConnection::close() calls XLinkResetRemoteTimeout() calls DispatcherWaitEventCompleteTimeout() calls dispatcherReset(). Only reset_mutex protects dispatcherReset(). That means that only reset_mutex protects dispatcherCloseLink() line 480 if(stream->id == INVALID_STREAM_ID) {.

Not exactly sure - seems like either of 2 things have to go wrong in that case for this to happen.

id = getSem(pthread_self(), curr);
if (id == NULL || XLink_sem_wait(id)) {
    dispatcherReset(curr);
}

I'm not sure in what state the link must be for this to occur. Would have to check when that is reached and is calling it from that state invalid.

Yet, would one thread be stuck looping between lines 599-610 of eventReader() in XLinkDispatcher.c due to the hardware failure and never reach the test for XLINK_RESET_REQ event?

First a regular "please reset/close the link" event is sent and afterwards it is closed by closing the underlying link (to unblock a blocking read/write in case of eg. TCP link where the remote goes missing). This should assure that at the end there will be an event that closes it down.

On 2. - The dispatcherReset is again called in the wierd state only - if it timesout, the regular close device fd path is taken.

I still believe my two code comments apply: move INVALID_STREAM_ID test inside the protection, and to always check the result code of XLink_sem_wait().

I'll test, but I'm highly confident that adding a semaphore wait on iterating through streams will break things. Tested - if one doesn't post afterwards, for "incorrect streams" it breaks, although the below works. Problem is still that doing so is UB as operating on non-initialized (eg INVALID_STREAM_ID) semaphores isn't valid. EDIT: XLink_sem_t actually checks, so it might be okay. EDITEDIT: closed streams won't post and it seems like it would get stuck there...

if(XLink_sem_wait(&link->availableStreams[stream].sem)){
    return NULL;
}
if (link->availableStreams[stream].id == id) {
    return &link->availableStreams[stream];
} else {
    if(XLink_sem_post(&link->availableStreams[stream].sem)){
        return NULL;
    }
}

On 3. - That might be an alternative actually - that the semaphores remain valid even if INVALID_STREAM_ID is set... That'd require initing all semaphores though, so at the end all can be unconditionally closed. Regarding the question, might make sense to post it first, but I guess if one closes the stream, they shouldn't communicate over it anymore, so the unposted semaphore shouldn't make a difference. Also the #ifndef __PC__ ... sem_destroy ... at `XLinkDispatcherImpl.c:369 is interesting...

On 4. - That one could indeed be deprecated.

Which brings me back to XLinkDispatcherImpl.c:488-497 - as stream ids are "invalidated" beforehand, it seems like its actually okay to iterate through all of them and cleanup any left things that remained on any stream. The current memset isn't okay, as it invalidates the semaphore - but skipping the semaphores and just doing a sem_destroy before memset should actually be okay. This part executes when the scheduler is down already (with exception of dispatcherReset in timeout, TBD), and destoroying a XLink_sem_t is okay as its ref counted if its still alive or not. The sem_wait & sem_post could remain I assume, but the memset has to be moved outside

TLDR: I think adding above snippet is actually okay (Might not be, see EDITEDIT), and moving memset after destorying the sem in XLinkDispatcherImpl.c:488-497. Also, the invalid stream skip in same XLinkDispatcherImpl.c:479-482, should be removed as it otherwise doesn't destroy the semaphores of closed streams (and cleansup). If the skip is removed, so should be the sem_wait/sem_post pair in same section, as otherwise the cleanup gets stuck waiting there. Which brings us closer to the original behavior, but thats okay, as in that case the main focus then seems to be on point 2. and its state.

themarpe commented 2 years ago

Pushed another take on xlink_stream_reset_race_2 - given the 2. point, if closeLink call is done after scheduler threads are finished with their work, the synchronization in cleanup shouldn't be needed and old way of cleaning up the streams makes sense. It also destroys all inited sempahores correctly and I've reverted the memset mistake.

https://github.com/luxonis/XLink/compare/develop...xlink_stream_reset_race_2

Still, the dispatcherReset from unsuccessful DispatcherWaitEventComplete... would have to be checked

EDIT: I'll move changes back to this branch if this checks out

diablodale commented 2 years ago

Not exactly sure - seems like either of 2 things have to go wrong in that case for this to happen.

Things always go wrong when thousands of events/second devices customers years worlds. πŸ˜‰ The error path might be statistically rare, but when billions->trillions of events are being processed...it will occur. In general, it seems the intention is to keep dispatcherReset() away from all threads except the dispatcher thread(s). I suggest this fail case align with that. If you don't want to align it in this PR, then I recommend marking it as TODO or BUGBUG for future investigation.

First a regular "please reset/close the link" event is sent and afterwards it is closed by closing the underlying link (to unblock a blocking read/write in case of eg. TCP link where the remote goes missing). This should assure that at the end there will be an event that closes it down.

1. You wrote the quote immediately above. I don't think this 100% guaranteed -- this is a race condition. There are at least two separate threads involved: main app thread, and the event processing thread. The main app calls XLinkConnection::close() calls XLinkResetRemoteTimeout() calls DispatcherAddEvent(XLINK_RESET_REQ) and the waits 1 nanosecond with DispatcherWaitEventCompleteTimeout(1 nanosecond). The XLINK_RESET_REQ event is in the queue and the event thread will process it whenever it wants...for example 5 seconds later. Meanwhile the main app thread times-out after 1 nanosecond and proceeds to call DispatcherDeviceFdDown(). That kills the os USB connection and has (I think) created the failure scenario of forever looping between lines 599-610 of eventReader().

2. yesyesyes πŸ’― The stream mutex (aka semaphore) has to be locked/unlocked on each loop. No longer a concern as the PR approach is shifting away from this approach by isolating which threads can call this (ignoring for now main app thread calling dispatcherReset).

3.a. The 32 stream semaphores (aka mutex) exist to protect one of the 32 entities of xLinkDesc_t::availableStreams[32]. The idea of having a single mutex for the whole array, or to have mutex for each slot (like today's code of a "semaphore" sem) is part of that larger arch overhaul of thread-safety. The 32 mutex live before/after any specific stream. And the stream struct enties in that array are reused. So the mutex/semaphore protecting a slot can be created on (xLinkDesc_t create) and on (xLinkDesc_t destroy) -- doing it on every stream open/close adds unneeded complexity.

3.b. Yes XLinkDispatcherImpl.c (365,369,380) and (417,427,431) are the two events that have the same bug. The xlink semaphore code prevents posting to a stream-sem if that stream is INVALID. The stream slot is forever leaked/lost.

4. ignoring for now as you requested the issue of main app thread calling dispatcherReset()... then... this function dispatcherCloseLink() is only called by dispatcherReset() which is only called by the thread running eventSchedulerRun() and this is a single thread for every XLinkConnect(). Each call to XLinkConnect() is what initializes the xLinkDesc_t struct. So that means that at this point-in-time, it is guaranteed only one thread has access to the xLinkDesc_t struct and its availableStreams[32] array. So there is no need to sem wait/post.

destoroying a XLink_sem_t is okay as its ref counted if its still alive or not. The sem_wait & sem_post could remain I assume, but the memset has to be moved outside

A caution on what you wrote -- perhaps for clarity. XLink_sem_t does not actually ref count. Meaning, it does not count/know the semaphore value itself. Instead, the XLink_sem_t ref count is actually the number of threads blocked/waiting on the semaphore. Everywhere it says "refs" it is clearer to think "waits" or "blocked" instead. The OS allows a semaphore to be destroyed safely when there are no waits. But...the app could fail. It is easily possible for thread1 to sem_wait() on the semaphore and be granted/unblocked (causing its semaphore value to be decremented) and then thread2 destroys the semaphore. Then when the thread1 calls sem_post() it will fail because the semaphore was destroyed. I don't know of a specific depthai bug regarding this, but there is some Xlink code which is fragile that I'll highlight in a separate issue.

diablodale commented 2 years ago

FYI, xlink_stream_reset_race_2 passed a single run of full 61 test suite. It is also passing my app's quick unit test in a loop of start/stop device streaming depth+color.

themarpe commented 2 years ago

Thanks for testing it out on your end as well - I'll open a separate PR then and close this one, just so to not require a force push and to keep a bit of context of the discussion.

Regarding:

You wrote the quote immediately above. I don't think this 100% guaranteed -- this is a race condition. There are at least two separate threads involved: main app thread, and the event processing thread. The main app calls XLinkConnection::close() calls XLinkResetRemoteTimeout() calls DispatcherAddEvent(XLINK_RESET_REQ) and the waits 1 nanosecond with DispatcherWaitEventCompleteTimeout(1 nanosecond). The XLINK_RESET_REQ event is in the queue and the event thread will process it whenever it wants...for example 5 seconds later. Meanwhile the main app thread times-out after 1 nanosecond and proceeds to call DispatcherDeviceFdDown(). That kills the os USB connection and has (I think) created the failure scenario of forever looping between lines 599-610 of eventReader().

Good point - but I think that the scheduler still gets the event while event reader is busy looping and sets curr->resetXLink = 1; in XLinkDispatcher.c:1052 - didn't stumble upon XLink not closing down cleanly (eg eventReader continuing to busy loop), but haven't specifically tested for it as well. Will try creating a test now, by delaying the RESET_REQ event and timing out ASAP.

themarpe commented 2 years ago

(Moved to https://github.com/luxonis/XLink/pull/34)