Open DevAndArtist opened 1 year ago
Hey @DevAndArtist - Thanks for the suggestion and interesting linked discussion :)
I appreciate the code sample - for the time being this change in Swift would be considered breaking behavior for the behavior of AsyncStream, so I'd want to make sure it's for real changing before making arbitrary changes.
I'll keep this thread open while the discussion is ongoing and until (if) a resolution is made.
Thanks again!
Short description of the issue:
Today I started a discussion in the Swift forums regarding cooperative task cancellation and
AsyncSequence
types. Digging through the implementations of types likeAsync[Throwing]Stream
I discovered that those were not cooperative at all and will straight cut off the buffer from the upstream during cancellation. With the current non-cooperative behavior there's no issue in RxSwift'svalues
implementation. However if theAsync[Throwing]Stream
were cooperative then the implementation would get stuck and never terminate the stream viaonDispose
.Here's a custom wrapper
AsyncThrowingCooperativeStream
type, which simply avoids the direct cancellation forwarding to the innerAsyncThrowingStream._Storage.cancel()
method, which would cause the termination of the buffer by callingAsyncThrowingStream._Storage.finish()
. It gives the custom logic such as a wrappedTask
or anObservable
to properly receive the dispose message and forward its decision through theonDispose
back to the captured continuation.Here's the wrap similar to the implementation in the RxSwift module:
As you can see,
onDispose
finishes the cooperative stream continuation by throwing aCancellationError
. An infinite runningObservable
which gets deposed without an error will essentially callonDispose
, but it will never reach correctly the continuation, at least not a cooperative one.Note:
continuation.finish
will change the internal state in such way that a subsequent call to it would result into a no-op. That said, if anObservable
completes viaonCompleted
first, a call toonDisposed
will not actually throwCancellationError
asterminal
would already equalfinished
andonTermination
closure would already benil
-ed.Expected outcome:
Right now, it's not a bug, but if
AsyncThrowingStream
ever gets changed to be cooperative on task cancellation, thevalues
implementation will get stuck.What actually happens:
Nothing right now. The
AsyncThrowingStream
buffer gets immediately terminated on cancellation and anything coming theObservable
subscription will be completely ignored. You can use the aboveAsyncThrowingCooperativeStream
to actually reproduce the potential future issue.RxSwift/RxCocoa/RxBlocking/RxTest version/commit
Compared with RxSwift 6.5.0
Platform/Environment
Any.
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
Xcode version:
Installation method:
I have multiple versions of Xcode installed:
Level of RxSwift knowledge: