ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Subscription to a behavior subject can miss the value of a parallel occuring on_next #512

Open PKobold opened 4 years ago

PKobold commented 4 years ago

I encountered a strange behavior when using a behavior subject (no pun intended :D) to spread a state change. Sometimes the subscriber won't get a state change by an on_next call, although you get the value from before the call when you subscribe.

This then results in heavilly unwanted behavior, because in my example you could endlessly wait for a state change to a certain value, which you should always get by concept.

I tracked this down and came to the understanding that this can happen due to the implementation of behavior::get_observable() in rx-behavior.hpp

observable<T> get_observable() const {
        auto keepAlive = s;
        return make_observable_dynamic<T>([=](subscriber<T> o){
            if (keepAlive.get_subscription().is_subscribed()) {
                o.on_next(get_value());
            }
            keepAlive.add(s.get_subscriber(), std::move(o));
        });
    }

after get_value is called there is a gap until the subscriber is properly added where the state of the behavior_observer could be updated by a parallel occuring on_next. The new subscriber o then won't get the new value.

A quick fix that works for me and stays local would be, that you change the mutex of the behavior_observer_state into a recursive mutex and then make it properly accessible and lock it in the lamba function above, see the following pseudo sample implementation:

class behavior_observer : public detail::multicast_observer<T>
{...
   class behavior_observer_state
   { 
     mutable std::recursive_mutex lock;
     public:
     ....
     std::recursive_mutex &mutex() const
     {
       return lock;
     }
   };
....
  public:
....
      std::recursive_mutex& mutex() const
      {
        return state->mutex();
       }
 };

class behavior
{
 ...
public:
....
    observable<T> get_observable() const {
        auto keepAlive = s;
        return make_observable_dynamic<T>([=](subscriber<T> o)  
           std::lock_guard<std::recursive_mutex> guard(keepAlive.mutex());
            if (keepAlive.get_subscription().is_subscribed()) {
                o.on_next(get_value());
            }
            keepAlive.add(s.get_subscriber(), std::move(o));
        });
    }
};

But maybe some of you have a better solution for this. If desired i can provide a little test example that can reproduce the problem.

Some side questions, while having a deeper look into the implementations in rx-behavior.hpp:

  1. Why using unique_lock instead of lock_guard in behavior_observer_state? Isn't lock_guard for this case to prefer in general because unique_lock is the complexer object, even though it may be optimized away that it wouldn't matter (but this is not ensured)?

  2. in the current implemantation of behavior::get_observable() the keepAlive object seems to be unnecessary redundant and this code, would also do the trick, or am i missing something? (compare with the first code section):

    observable<T> get_observable() const
            {
                return make_observable_dynamic<T>([=](subscriber<T> o) {
                    if (s.get_subscription().is_subscribed())
                    {
                        o.on_next(get_value());
                    }
                    s.add(s.get_subscriber(), std::move(o));
                });
            }

    BR Paul

PKobold commented 4 years ago

I created a pull request with a possible solution based on the stated pseudo code above. I also refactored the get_observable function a little bit. I kept the keepAlive pattern but removed all (in my view) unecessary copies/accesses of other objects.

kirkshoop commented 4 years ago

Thank you for the report and especially the PR!

This is a tricky point in the code. The code goes to great lengths to never hold a lock while calling on_next, on_error or on_completed. Even with a recursive mutex the on_next might wait (yes, this is bad) for work on another thread and that work might call get_observable on the subject from the second thread which would now block waiting for the first thread to release the lock.

I think that the fix for this issue is more invasive. the underlying multicast_observer for behaviour and replay should use a queue of values with a per-subscription position into the queue to be used when catching up a subscriber to the latest.

PKobold commented 4 years ago

Hi Kirk, thank you for your reply.

I agree and I'm also not really happy with this solution anymore. I think possibly i already stumbled into the locking problematic in a nasty rare timing dependent case in my working environment (I didn't have the time to clarify this for now). But there are definitely now deadlock scenarios which didn't occur before. I will close the PR for now.

What i can't take from your post is what your prefered next step would be.

PKobold commented 4 years ago

Hi again,

Last week i fiddled around with this for quite a bit. At first i thought i had a proper solution to ensure that subscribing can't result in a deadlock while ensuring the correct state and order of the published values, regardless from which thread one parrallel on_next occurs. But then i ran into deadlock problems when multiple parallel on_next occured. From my understanding either you live with the solution right now or in the end you will break always existing code. Either you create dead lock possibilities, new synchronization/value order problems or when working with some kind of synchronization queue you need an permanent extra thread and also you will not be in the same current thread context anymore when initially simple subscribing like now.

Also regardless of my changes i made one test that in rare cases will just crash, because on_next is called parallel as the on_completed is called and the later finishes first. In my case then something seems to get cleaned up, that the execution of the provided lambda function for the on_next runs in an invalid memory context and then crashes, even though no references are passed to the lambda function.

So i thought, the other languages must have run into the same problematic and how did they solve it. And it seems they seem to do it all a little bit differently. The two big players Rx.Net https://github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/System.Reactive/Subjects/BehaviorSubject.cs and RxJava https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java (2.x looks very similar) for example do it also differently, from what i can deduce.

.Net takes a really simple high locking approach, similar to my approach when subscribing, but more invasive, since subscribing in this case also blocks call of on_error and on_completed by another thread, while Java takes more the approach in your direction if i interpret it correctly, but if i read over threads like this (even if its closed) https://github.com/ReactiveX/RxJava/issues/1184 there still seems to be some issues. But since i don't use java i can't test this.

In the end i would favor the .Net approach. So I have reopened the initial pull request. Just close it if you don't want to use it. But if it is used i want to raise the question if we should simplify this more and have only one lock and not two like now (one in the behavior state and one in the underlying multicast_observer state)

And also what to do with the replay subject. (I/We don't use it for now, so it didn't concern me). .Net uses even stronger locking than in the behavior subject running the call of the observer functions completely in a locked state. See https://github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/System.Reactive/Subjects/ReplaySubject.cs