ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.05k stars 395 forks source link

Possible data race in composite_subscription? #475

Closed iam closed 5 years ago

iam commented 5 years ago

composite_subscription_inner looks like it might have an incorrect implementation of double-checked locking ?

https://github.com/ReactiveX/RxCpp/blob/7930ccc3f5eca55c5866fc7b1d2476f46b8cec17/Rx/v2/src/rxcpp/rx-subscription.hpp#L245

        inline weak_subscription add(subscription s) {
            if (!issubscribed) {
                s.unsubscribe();
            } else if (s.is_subscribed()) {
                std::unique_lock<decltype(lock)> guard(lock);          // <<<<<<<<<<<<<
                subscriptions.insert(s);
            }
            return s.get_weak();
        }

After acquiring the guard lock, issubscribed could already be false. Subsequently this races against either clear or unsubscribe.

   Thread 1                                Thread 2
-----------------------------------------------------------
    add(s)            
                                       clear() or unsubscribe()
                                        lock
                                           call unsubscribe for all
                                        unlock
        block-for-lock
  -->  insert into set
        unlock

Likely correct code:

        inline weak_subscription add(subscription s) {
            if (!issubscribed) {
                s.unsubscribe();
            } else if (s.is_subscribed()) {
                std::unique_lock<decltype(lock)> guard(lock);
  /*+*/         if (!issubscribed.load()) {
  /*+*/           s.unsubscribe();
  /*+*/          } else {
                   subscriptions.insert(s);
                }
            }
            return s.get_weak();
        }

This potential issue seems repeated in most lock usages (add, remove, clear) except unsubscribe which is ok because of the exchange.

remove() also seems to have an additional data race with w:

        inline void remove(const weak_subscription& w) {
          if (issubscribed && !w.expired()) {           
            auto s = subscription::lock(w);                           /// <<< (2) this could already be expired
            std::unique_lock<decltype(lock)> guard(lock);  /// <<< (1) issubscribed could be false
            subscriptions.erase(s);                                      /// calls erase on moved-from set, could be benign
          }
        }

and calling 'subscription::lock' on an expired subscription would then call std::terminate (which btw, seems like a strange API decision) but at least the program would crash immediately instead of having data corruption.


Another layer of possibly unspecified behaior:

The data race in expressed here in that std::set 'subscriptions' could be used (erase, insert, etc) even after it is moved-from in erase.

https://en.cppreference.com/w/cpp/container/set/set overload (4)

This pattern is repeated by clear/unsubscribe [even in single threaded code], in that they both call the move-constructor on subscriptions.

kirkshoop commented 5 years ago

I agree with your assessment. Would you like to make a PR with the changes you believe are needed?

iam commented 5 years ago

While reviewing further, also noticed there is possibly a data race for composite_subscription::unsubscribe because it has a separate copy of the atomic issubscribed ?

We can thus have 3 states

base_subscription_state       composite_subscription_state
(issubscribed)                        (issubscribed)
true                                       true
false                                      true
false                                      false

Furthermore, this can lead to strange behavior where a thread observes the composite_subscription to be in the unsubscribed state, and yet when calling add(s) it would not immediately observe s.unsubscribe()

Expected behavior (last line emphasized):

Thread 1                                Thread 2
   cs.unsubscribe()            
      issubscribed=false          
                                            tmp = cs.is_unsubscribed
                                            cs.add(s)
      (done)                           
                                            s.unsubscribe() /*clear*/

Actual behavior due to data race (last line emphasized):

Thread 1                                Thread 2
   cs.unsubscribe()                
      issubscribed=false          
                                           tmp = cs.is_unsubscribed
                                           cs.add(s)
                                           (done)
      cs.issubscribed=false
      s.unsubscribe() /*clear*/

In this case Thread 2 could call 'cs.is_subscribed()' first before the 'add'. If is_subscribed returns false, one would expect that add(s) definitely calls s.unsubscribe() .

Maybe not as big of an issue as the original post though. It would be perfectly fine if 'is_unsubscribed' was defined to be a std::memory_order::relaxed ordering, but I suspect it is implicitly understood to be 'seq_cst' to be consistent with RxJava?

iam commented 5 years ago

I agree with your assessment. Would you like to make a PR with the changes you believe are needed?

Thanks Kirk. I went ahead and created a PR as you suggested.