ReactiveX / RxSwift

Reactive Programming in Swift
MIT License
24.37k stars 4.17k forks source link

Locks during Event Forwarding in RxSwift #2525

Open firecore opened 1 year ago

firecore commented 1 year ago

Short description of the issue:

We have identified a commonly encountered problem in RxSwift where event forwarding is performed under a lock. It leads to deadlocks and unexpected locks on threads, causing issues in code execution. We created a sample that demonstrates two specific problems withtestDeadlock and testTemporaryLock methods within the RxTester class.

Expected outcome:

We assume that event forwarding should be performed outside of the locked sections. However, if there are cases that require forwarding under a lock to guarantee that only one event is forwarded simultaneously, it should use a separate lock.

What actually happens:

The issue of event forwarding under a lock has widespread implications throughout the RxSwift codebase.

Problem 1: testDeadlock

This example illustrates the deadlock caused by a specific combination of Rx operators. The major root causes:

Problem 2: testTemporaryLock

The testTemporaryLock example showcases how a lightweight event generation (e.g., on the main thread) can unexpectedly wait on a lock due to the heavyweight processing of events in a background thread. It happens because some operators (e.g., ‘debounce’) may receive and forward events in different threads but do that under the same lock.

Presumably, similar problems might reproduce with other operators. For example, all classes using ‘SynchronizedOnType’ protocol automatically follow the pattern; some classes not using it might have similar logic (like ShareReplay1WhileConnectedConnection). At the same time, many pieces of RxSwift code perform event forwarding after locked sections, and it looks right.

Self contained code example that reproduces the issue:

import Foundation
import RxSwift

class RxTester
{
    private let bag = DisposeBag()

    // Some observable that starts with an event and doesn't complete instantly.
    private func makeSomeObservable<T>(firstValue: T) -> Observable<T>
    {
        Observable<T>.create({ observer in
            observer.onNext(firstValue)
            return Disposables.create()
        })
    }

    /*
     This test demonstrates a deadlock, which is possible because:
     * share(replay: 1) performs event forwarding for non-first subscribers under its lock (see ShareReplay1WhileConnectedConnection.synchronized_subscribe(_:))
     * combineLatest, using SynchronizedOnType protocol, performs event forwarding under its lock

     Here's the scheme of the locked state of the threads:
     BG: -sub(1st)-> sharedSequence2 -sub(2nd)-> sharedSequence1 -on[locked]-> sharedSequence2 -on-> combLatest [waiting on lock]**
     Current: -sub-> complexSequence -sub-> flatMapLatest -sub-> combLatest -on[locked]*-> flatMapLatest's SwitchSink -sub-> sharedSequence1 [waiting on lock]
       * When combLatest is subscribed in the current thread, its 1st component produces a 'stWithValue' and tries to subscribe to sharedSequence2. But since it's not the 1st subscriber, then combLatest (it's 1st component) is only added to sharedSequence2's observers without immediate event (which is not ready yet, because sharedSequence2 is still performing its 'connect' at the moment). So, combLatest subscribes to its 2nd component, receives 'justValue' and starts forwarding its pair of values to flatMapLatest.
       ** Later, when BG thread completes its slow start of sharedSequence2, it forwards its 1st event to all observers, including the previously subscribed combLatest.
     */
    func testDeadlock()
    {
        let sharedSequence1 = makeSomeObservable(firstValue: "obsValue")
            .share(replay: 1)

        // We perform a preliminary 1st subscription to sharedSequence1, so that next subscriptions to it will trigger event forwarding under its lock.
        sharedSequence1.subscribe { _ in
            NSLog("First subscription to sharedSequence1 finished in current thread (got an event)")
        }.disposed(by: bag)

        let sharedSequence2 = sharedSequence1.do(onNext: { _ in
            // This sleep makes the 1st subscription to sharedSequence2 slow to provide a stable reproducibility.
                Thread.sleep(forTimeInterval: 0.5)
            })
            .share(replay: 1)

        DispatchQueue.global().async
        {
            sharedSequence2.subscribe { _ in
                NSLog("First subscription to sharedSequence2 finished in another thread (got an event)")
            }.disposed(by: self.bag)
        }

        // Sleep to ensure that 1st subscription to sharedSequence2 is in process in another thread.
        Thread.sleep(forTimeInterval: 0.2)

        // startWith allows combineLatest to produce a value even though sharedSequence2 is not ready yet.
        let combLatest = Observable.combineLatest(sharedSequence2.startWith("stWithValue"),
                                                  Observable.just("justValue"))

        // combLatest maps into another sequence depending on the sharedSequence1 (for simplicity it's just sharedSequence1 here).
        let complexSequence = combLatest.flatMapLatest { _ in
            return sharedSequence1
        }

        complexSequence.subscribe { _ in
            NSLog("complexSequence produced an event")
        }.disposed(by: bag)

        NSLog("testDeadlock has finished (shouldn't happen)")
    }

    private static var previousTimerEventDate: Date!

    /*
     This test demonstrates unexpected temporary locks on the main thread, which happens because:
     * debounce operator receives and forwards events under the same lock
     */
    func testTemporaryLock()
    {
        Self.previousTimerEventDate = .init()

        let mainThreadEventsGenerator = Observable<Int>.timer(.zero,
                                                              period: .seconds(1),
                                                              scheduler: MainScheduler.instance)
            .do(onNext: { event in
                let timeSince = Date().timeIntervalSince(Self.previousTimerEventDate)
                NSLog("Timer produced new event: \(event),  time since previous event: \(timeSince)")
                Self.previousTimerEventDate = .init()
            })

        let scheduler = SerialDispatchQueueScheduler(qos: .default)
        let backgroundProcessingSubscription = mainThreadEventsGenerator.debounce(.milliseconds(0), scheduler: scheduler)
            .subscribe { _ in
                NSLog("Processing next debounced event in thread \(Thread.current)...")
                // Imitation of a heavy task.
                Thread.sleep(forTimeInterval: 5.0)
                NSLog("Processed")
            }

        DispatchQueue.global().asyncAfter(deadline: .now() + 30.0) {
            NSLog("Will dispose subscription \(backgroundProcessingSubscription)")
            backgroundProcessingSubscription.dispose()
        }
    }
}

Reproduction Steps:

To reproduce the issue, follow these steps:

  1. Create an instance of RxTester
  2. Call the desired method, such as testTemporaryLock()
let tester = RxTester()
tester.testTemporaryLock()

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

version or commit here

Platform/Environment

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

Xcode version:

Xcode 14.1

⚠️ Fields below are optional for general issues or in case those questions aren't related to your issue, but filling them out will increase the chances of getting your issue resolved. ⚠️

Installation method:

I have multiple versions of Xcode installed: (so we can know if this is a potential cause of your issue)

Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)