ReactiveX / RxSwift

Reactive Programming in Swift
MIT License
24.32k stars 4.17k forks source link

Stack overflows caused by MergeLimitedSink operators #2615

Open geoffmacd opened 1 month ago

geoffmacd commented 1 month ago

Short description of the issue:

At Dropbox, we use RxSwift heavily in a serial queue that must be FIFO to process something that requires us to wait sometimes (for reasons im not going to go into). We have 1 main observable that represents our input, and uses concatMap in conjunction with a .just() and .delay() to achieve this. The delay period, which is rare, is < 3 seconds. The input is generally all at once (10,000s of elements in short period of time). Since at least 2021, our top crash has been a stack overflow in Rx code we've never been able to address. This crash affects a minority of users on launch and is rarely reproduced until now...

We found reproduction case (see sample code) that can cause stack overflows when using standard RxSwift concatMap() operator (or concatMap/ merge(maxConcurrent: in Merge.swift) when in combination with randomly delayed sequences. This example function will cause super deep stack traces (or cause a stack overflow crash directly if you are lucky). It seems to be important that we do not exclusively .delay or not, only that there is a random mix of delayed and not delayed "just" elements.

If you run this code, if a crash doesn't happen, you can at least see a super deep stack size inside MergeLimitedSinkIter.on with the Thread API (just print Thread.callStackReturnAddresses.count). This is the source of the S/O crash we are experiencing for some users.

It is a concurrency issue where a .just() emitting immediately on the current queue seems to mess up all internal uses of MergeLimitedSinkIter (which is concat/concatMap/merge(maxConcurrnet:)).

Expected outcome:

The above code sample should be protected against stack overflows by intelligently scheduling the next inner subscribe.

What actually happens:

Stack over flow that looks like this:

Pasted Graphic 1

Self contained code example that reproduces the issue:

func generateStackOverflow() {
        print("starting rx concatMap/just subscribe")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if Int.random(in: 0 ... 100) != 0 {
                    return Single.just($0)
                } else {
                    return Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler)
                }
            }
            .subscribe(onCompleted: {
                print("finished rx concatMap")
            })
            .disposed(by: disposeBag)
    }

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

we are on 6.6.0 but this has not been addressed or even noted.

Platform/Environment

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

Xcode version:

15.4

Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)

danielt1263 commented 1 month ago

The simplest solution is to add .delay(.seconds(0), scheduler: scheduler) to the just... Yes?

geoffmacd commented 1 month ago

To both sides of the if statement? Yes- I suppose that would work - but it was a surprise that using the API like this could cause this in the first place. Its weird to have to know to balance it like this.

danielt1263 commented 1 month ago

Very weird indeed. Especially given that if the body of the closure is only { Single.just($0) } or if it's only { Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler) }, then the call stack is fine.

geoffmacd commented 1 month ago

Yes, my point is that devs should feel free to pass any given Observable back to concatMap() and not fear stack overflows. I believe my fix should protect against that.

danielt1263 commented 1 month ago

What is the performance hit? I see if I use: return Observable.just($0).delay(.nanoseconds(0), scheduler: scheduler) in the if block it takes 14 seconds to run through the 100000 elements whereas if I use `return Observable.generate(initialState: $0, condition: { in false }, iterate: { $0 })` it only takes 6 seconds. (Both have a stack size of 31)

danielt1263 commented 1 month ago

I'm seeing that if I take your fix and use your initial code, the stack size goes up to 42 but the runtime is only 3 seconds... Looks like a good fix to me (I assume all the tests that use MergeLimitedSinkIter still pass?

geoffmacd commented 1 month ago

yes tests pass both in this repo (they didn't auto run in CI though) and like 3000 unit tests that heavily rely on this func in our own codebase.

When I tested with this function with device -iPhone XS:

        let t1 = Date()
        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                return Single.just($0).delay(.nanoseconds(0), scheduler: scheduler)
            }
            .subscribe(
                onCompleted: {
                    print("finished rx concatMap in \(Date().timeIntervalSince(t1)) s")
                }
            )
            .disposed(by: disposeBag)

I got 6.6 seconds runtime with my fix and about the same on master.

geoffmacd commented 2 weeks ago

I found an even simpler repro function for this issue, only the first inner subscription needs to be a delay (or anything that queues), then the DelaySink will emit all elements (Justs) at once.

    func generateStackOverflow() {
        print("starting rx concatMap + just + delay")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(0 ..< 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if $0 == 0 {
                    // only on the first element is it delayed, the `DelaySink` will then emit all elements emitted from during this delay
                    return Single.just($0).delay(.seconds(1), scheduler: scheduler)
                } else {
                    return Single.just($0)
                }
            }
            .subscribe(
                onCompleted: {
                    print("finished rx concatMap + just + delay")
                }
            )
            .disposed(by: disposeBag)
    }