ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

blocking_observable uses strange infinite-spin-wait #540

Closed quetzalcoatl closed 4 years ago

quetzalcoatl commented 4 years ago

I found this out when diagnosing a different problem. I had/have a deadlock problem on as_blocking and, while reading the code in hope of understanding what exactly happens out there, I've noticed this piece of strange synchronization in rx-observable.hpp in static auto blocking_subscribe():

    std::mutex lock;
    std::condition_variable wake;

    // ....

    auto cs = scbr.get_subscription();
    cs.add(
        [&, track](){
            // OSX geting invalid x86 op if notify_one is after the disposed = true
            // presumably because the condition_variable may already have been awakened
            // and is now sitting in a while loop on disposed
            wake.notify_one();
            track->disposed = true; // ****B
        });

    std::unique_lock<std::mutex> guard(lock);
    source.subscribe(std::move(scbr));

    wake.wait(guard,
        [&, track](){
            // this is really not good.
            // false wakeups were never followed by true wakeups so..

            // anyways this gets triggered before disposed is set now so wait.
            while (!track->disposed) { // **** A
                ++track->false_wakes;
            }
            ++track->true_wakes;
            return true;
        });
    track->wakened = true;
    // ....

This is the whole code involved. The wake,lock,etc. are not exposed anywhere else, so it's safe to assume we see their whole lifetime, and all accesses here.

The standard says that a std::condition_variable is allowed to do false wakeups and that users are obliged to protect their code against that. And in fact that occurs on Windows system a lot, where the OS tries to ping threads to prevent starving. To make things easier, that's why we have the lambda-version overload of std::condition_variable::wait( .. ) that takes a bool-returning lambda which can is used to filter-out false wakeups. Condvar guarantees owning the lock while the lambda is called, so it's safe to check any actual conditions we need and return false to make the condvar put back to sleep, or return true to say it was a proper wakeup.

Now, please note that blocking spin-wait on line marked A.

First of all, it's in the wake's condition assertion lambda. When a really false wakeup occurs (and it totally may happen on Windows), then the condition in while will be never satisfied in any reasonable time, leaving the thread spinning on 100%CPU usage, indefinitely. Not good. Or even infinitely, because while in the lambda, the lock is owned, and that may prevent other threads from accessing the resource in order to produce a proper wakeup. 'Not good' just turned into 'almost catastrophic'.

Second, why a while here at all? In normal case, there's no point in doing it, as the 'while' is build-in into the condvar's wait(lambda) overload. If the code adhered to the typical use of a condvar, it would just be:

    wake.wait(guard,
        [&, track](){
            return track->disposed;
        });

and that's it. If wakeup was false, just go back to sleep and let it wait further. If wakeup was true, alright, return true and break asap. That's how it should be, but it cannot, because..

... because "third": let's now see the line marked B. First, the condvar is notified, and only then, an atomic flag is set. It's that very same atomic flag that is asserted in the condvar's lambda.

Effects of B:

*) ok, on Windows, we could also count on getting more of false-wakeups from the OS, but.. please don't

Let me suggest a fix: better do it as the standard use of a condvar would suggest:

    auto cs = scbr.get_subscription();
    cs.add(
        [&, track](){
            track->disposed = true; // ensure that a proper wakeup will be detected as such
            wake.notify_one();
        });

    std::unique_lock<std::mutex> guard(lock);
    source.subscribe(std::move(scbr));

    wake.wait(guard,   // let the wait(lambda) do its job as intended
        [&, track](){
            if (!track->disposed) { // no spinloop, just a test
                ++track->false_wakes;
                return false;
            } else {
                ++track->true_wakes;
                return true;
            }
            // or just: "return track->disposed;" if wakeup counters are not needed
        });
    track->wakened = true;
    // ....

Regarding indefinite spinwaiting if blocking observable never completes: now there's no chance for it. Regarding spinwaiting case of false wakeups: now it's not needed, and there's none, or - on Windows - it's built-in in the condvar and is controlled by the OS's planner via OS-triggered false wakeups blazingly fast dismissed by the condvar's lambda Regarding pessimistic condition: there's no chance for it in the first place, as the flag is set before notifying, and nobody else is able to notify the condvar

I see that there are also some atomic counters that gather statitics of false/true wakeups, but in fact, here, with the spin-wait, the false stat is pretty broken. It counts the number of spins, not the number of wakeups.

I also have to admit that I totally don't get what's in the comment about OSX. That comment tries to rationalize why the notify() was put before raising the flag, giving some hint about crashes on OSX. However, that comment considers those crashes to be a result of spin-waiting in the false wakeup handler, which were not the right thing to do there in the first place. If the crashes really occurred due to the spin-waiting (which I doubt at the moment), then removing the spin-wait has quite a chance to fix them up as well. Unfortunatelly, I've got no way to check that on OSX.

nikobarli commented 4 years ago

Are you looking at an old code ? I believe the part that spin-wait on blocking subscribe has already been fixed here: https://github.com/ReactiveX/RxCpp/pull/453

quetzalcoatl commented 4 years ago

Actually, that IS possible that I'm looking at old code.. but I wouldn't suspect that.

I fetched package from VCPKG, it's marked 4.1.0, and that matched the most-recent tag as seen here on GitHub image

Earlier I was using 4.1.0 from NuGet, but some people reported problems, and it turned out that it's stale and not updated anymore ~ https://github.com/ReactiveX/RxCpp/issues/498 - and kirkshoop responded - I don't get it why it's not possible to take over one's account, or why it's not possible to just publish another nuget under a new name, but OK, I can live with that -and he said that we should use vcpkg - ok - so I tried getting it from there and assumed that this one would get updates..

It seems I was wrong then. The issue you've found clearly shows that it's fixed now, but it all also shows that maybe 4.1.0 on vcpkg is newer that the one one nuget, but is still 2yrs behind :|

Thanks for looking into this. I suppose that we I can close this (non)issue and revive #498 instead.