Closed sherlock1982 closed 5 years ago
Hmmm. Interesting. Thanks for your thoughts here.
Yes, it looks like a couple of distinct issues:
The behaviour of ReplaySubject
appears non-standard. It should behave more in the manner described at http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/ReplaySubject.html to remain faithful to its namesake.
We still don't offer a non ref-counted version of the share(replay:)
operator. It would be good to add an operator with the signature shareReplay(replay count: Int, refCount: Bool = true)
to round out the expected behaviour.
Does that sum things up?
Yes I think that's very close.
Current version behaves close to RxJS shareReplay({ refCount: false})
though it has an issue that it stops to replay after Source has been completed. Important thing is that Subject here remains the same and never dereferences it's replay buffer even if there are no subscribers.
There needs to be a version shareReplay({ refCount: true})
which will disconnect from Source when there are no subscribers left and clean up replay buffer. Then it should start from scratch on first subscriber. I believe it doesn't matter if Source completed already or not.
Honestly my opinion is that variant 1. is an issue by itself. RxJava simply has variant 2. RxJS team introduced variant 2. because they needed compatibility. I never used variant 1. anywhere because it has a stuck data inside which can't be removed and it never re subscribes to Source which looks like a broken stream for me.
Important thing is that Subject here remains the same and never dereferences it's replay buffer even if there are no subscribers.
I assume you mean when using the share(replay:)
operator and not ReplaySubject
directly? If so, that's curious as my understanding of Combine's autoconnect()
operator is that it should behave the same as refCount()
and therefore release any upstream resources when the last subscriber cancels. Something to investigate for sure.
In the meantime, let's commit to making the behaviour of ReplaySubject
consistent with other implementations.
I'm one week expert in Swift but still tried to do my best in pull request :-)
Though there's another strange case which I don't understand. Consider the following code:
let externalCommand = CurrentValueSubject<Bool, Never>(true)
let stream = externalCommand.map { _ in Just(1) }.switchToLatest().print("Debug: ").share(replay: 1)
let subscription1 = stream.sink(receiveValue: {
print("Receive: \($0)")
})
subscription1.cancel()
let subscription2 = stream.sink(receiveValue: {
print("Receive: \($0)")
})
subscription2.cancel()
Why subscription2 receives twice ? I believe it should receive one time only. It shouldn't resubscribe in current version isn't it?
Actually I realized. In this case ReplaySubject is not re-created but Combine magically subscribes to the Source. Therefore I got one element from the ReplaySubject buffer and one element from the Source. ReplaySubject doesn't control subscription to the Source.
And here goes our case:
Ok I finished. Please review the request - I finally implemented what I meant. Looks like ReplaySubject is not recreated with .autoconnect(). So the decision if to cleanup internal buffer should be implemented there. Currently it works for me in both variants.
Thanks for your thoughts, @sherlock1982.
I've modified ReplaySubject
to behave as discussed here:
The behaviour of ReplaySubject appears non-standard. It should behave more in the manner described at http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/ReplaySubject.html to remain faithful to its namesake.
Your PR didn't include any unit tests, so I've included these and the result should be available on master.
In terms of the second point:
We still don't offer a non ref-counted version of the share(replay:) operator. It would be good to add an operator with the signature shareReplay(replay count: Int, refCount: Bool = true) to round out the expected behaviour.
It doesn't look like autoconnect behaves in quite the same way as refcount. So we'll need to create another issue for this if it's something that people are missing/needing.
Consider the following code involving ReplaySubject:
Only subscription1 will receive values. This is quite clear because ReplaySubject goes to complete state when uplink completes.
Issue 1: All late subscribers will receive nothing but completion.
Issue 2: Though internally ReplaySubject keeps references to data in buffer which will never be accessed by late subscribers and I would consider it a memory leak.
I believe ReplaySubject should behave differently:
When there are no subscribers we should disconnect upstream and cleanup buffer. Thus we get back to absolutely cold state how it was before first subscription. And data which is not accessible will be dereferenced.
When new subscriber comes we connect upstream and start from scratch.
When upstream completes we shouldn't propagate completion but still serve data from buffer to late subscribers.
I don't know how it works in RxSwift but in RxJava:
Works just how I described it. In RxJs it's quite complex and you need to write:
I think since RxJS 6.4.0 it's equal to:
In both cases when there are no subscribers data inside subject will be dereferenced and upstream will be cancelled. When new subscriber comes it starts from scratch.
I know it's a huge holywar in RxJs on this subjects. There's a great article showing the differences: here
Could you comment on this please so we can brainstorm together.