ReactiveX / RxSwift

Reactive Programming in Swift
MIT License
24.32k stars 4.17k forks source link

RxBlocking does't work well with PublishSubject and a suggestion to improve it #2617

Open xrloong opened 1 month ago

xrloong commented 1 month ago

Short description of the issue:

I know there is already an issue, #1480, discussing this case, and @abdulowork had replied the reason and gave an alternative way to test PublishSubject.

But I don't think the limitation is reasonable since when we test some mechanism using Observable, we not know if this observable is derived from PublishSubject or not.

Moreover, it's possible that developers may change underlying implementation from ReplaySubject to PublishSubject for some reason, and then related test cases failed

Thus, I suggest to fix the behavior of for Observable.toBlocking(). Here is the possible implementation:

private var disposeBag: DisposeBag!
extension ObservableConvertibleType {
    public func toBlocking(timeout: TimeInterval? = nil) -> BlockingObservable<Element> {
         let replaySubject = ReplaySubject<Element>.createUnbounded()

         // forward all events to replaySubject
         asObservable().subscribe(replaySubject)
             .disposed(by: disposeBag)

         return BlockingObservable(timeout: timeout, source: replaySubject)
    }
}
markst commented 1 month ago

Might share this for inspiration: https://github.com/albertbori/TestableCombinePublishers

Where by it's possible to setup an expectation which collects the output of a publisher, then later the expectation can be waited until an output is produced. For example:

let expected = sut.currentPlayingInfo.playProgress
    .compactMap { $0?.progress }
    .map { $0.rounded() }
    .first()
    .expect(expectTime) // <- Using `TestableCombinePublishers`

await play(
    actionHandler: sut.actionHandler,
    media: media,
    seek: initialTime
)

await sut.actionHandler.performAction(.skipBackward(skipInterval))

expected.waitForExpectations(timeout: timeout)