ReactiveX / RxSwift

Reactive Programming in Swift
MIT License
24.34k stars 4.17k forks source link

Race conditions cause event to be delivered after sequence is disposed #2268

Closed anton-plebanovich closed 2 years ago

anton-plebanovich commented 3 years ago

Short description of the issue: Sometimes next event might be emitted after a sequence is disposed.

Expected outcome: There shouldn't be events after the sequence is disposed.

What actually happens: Sometimes there are events after the sequence is disposed.

Self contained code example that reproduces the issue: Please clone, install pods and run https://github.com/anton-plebanovich/SingleView/tree/20110f6c1efefae3225cca57ef4e198357184d28

or just test this code:

        DispatchQueue.global().async {
            while true {
                let disposeBag = DisposeBag()
                let uuid = UUID().uuidString
                let scheduler = SerialDispatchQueueScheduler(queue: DispatchQueue.global(qos: .default),
                                                             internalSerialQueueName: "RxSwift-Test-\(uuid)")

                Observable.just(1)
                    .observeOn(MainScheduler.instance)
                    .debug("mainScheduler - \(uuid)")
                    .do(onNext: { [weak disposeBag] _ in print(disposeBag!) })
                    .subscribeOn(MainScheduler.instance)
                    .observeOn(scheduler)
                    .subscribeOn(scheduler)
                    .debug("scheduler - \(uuid)")
                    .subscribe()
                    .disposed(by: disposeBag)
            }
        }

Valid output will look like that:

2020-12-30 15:32:51.181: scheduler - E9267A6E-56D2-4644-A7E4-5EC7EBE75D7D -> subscribed
2020-12-30 15:32:51.182: scheduler - E9267A6E-56D2-4644-A7E4-5EC7EBE75D7D -> isDisposed

Crash output will look like that:

2020-12-30 15:32:09.159: scheduler - 20D2B95A-1369-43AE-95B8-DEBF530C0D41 -> subscribed
2020-12-30 15:32:09.161: scheduler - 20D2B95A-1369-43AE-95B8-DEBF530C0D41 -> isDisposed
2020-12-30 15:32:09.161: mainScheduler - 20D2B95A-1369-43AE-95B8-DEBF530C0D41 -> subscribed
2020-12-30 15:32:09.163: mainScheduler - 20D2B95A-1369-43AE-95B8-DEBF530C0D41 -> Event next(1)

RxSwift/RxCocoa/RxBlocking/RxTest version/commit RxSwift (5.1.1)

Platform/Environment

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

Xcode version:

Version 12.3 (12C33)

Installation method:

I have multiple versions of Xcode installed: (so we can know if this is a potential cause of your issue)

Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)

freak4pc commented 3 years ago

Hey @anton-plebanovich, thanks for reporting, I'll try your example in a bit.

Just wondering - what is the practical use case that you noticed this occur in?

freak4pc commented 3 years ago

Your example doesn't really allow tracking any issue, it's too much output. Please provide a more focused example and a practical use case where this is happening.

Notice that:

  1. As is, your example crashes for me because disposeBag gets nilled out at the end of the scope. If you want to avoid this you need to move the disposeBag to outside the scope.
  2. You are mixing calling DispatchQueue.async by hand, and using a scheduler. This can sometimes caused unexpected side-effects. I'd suggest staying in the scheduler realm.

Waiting for a reproducible example and expected output :)

Thanks!

anton-plebanovich commented 3 years ago

@freak4pc Crash there is to catch the issue since it's a race condition. You just need to set an exception breakpoint to be able to debug the crash state.

Here is an example of the same behavior without using DispatchQueue.async - https://github.com/anton-plebanovich/SingleView/tree/c5805e9163aad5800a7fd827c7644b5a0385ff14

        while true {
            let disposeBag = DisposeBag()
            let uuid = UUID().uuidString

            let scheduler1 = SerialDispatchQueueScheduler(queue: DispatchQueue.global(qos: .default),
                                                          internalSerialQueueName: "RxSwift-Test-1-\(uuid)")

            let scheduler2 = SerialDispatchQueueScheduler(queue: DispatchQueue.global(qos: .default),
                                                          internalSerialQueueName: "RxSwift-Test-2-\(uuid)")

            Observable.just(1)
                .observeOn(scheduler2)
                .debug("scheduler2 - \(uuid)")
                .do(onNext: { [weak disposeBag] _ in
                    if disposeBag == nil {
                        print("Operation executed after disposal   - \(uuid)")
                        fatalError("Please put a breakpoint on that line or install an exception breakpoint")
                    }
                })
                .subscribeOn(scheduler2)
                .observeOn(scheduler1)
                .debug("scheduler1 - \(uuid)")
                .subscribeOn(scheduler1)
                .subscribe()
                .disposed(by: disposeBag)
        }
anton-plebanovich commented 2 years ago

Closing this one since it is how concurrency and RxSwift are actually designed to work. If an operation is started, like .do(onNext:), it doesn't check if the chain was disposed and executes till the end. My bad.