Closed geoffmacd closed 4 months ago
The withLatestFrom operator has basically the same behavior as combineLatest in this case. If there is no latest, there's nothing to return.
The solution is for you to start(with:)
on the second operator so your "sentinel value" emits if the second observable hasn't emitted a value yet.
It is not correct to 'startsWith' in this case. There is a need here to truly not have any value/event for highly asynchronous code as I suggested in my use case. Starting with some default value would be the wrong behavior and lead to other issues.
I suppose my disagreement is in the api contract with the word "latest". I would not expect that this would silently break/cause a race condition if there is no "latest" value. Throwing an error at least or allowing a different behavior is preferable to this situation.
Hmm... Well, not only would your suggestion be a breaking change, but it would also be contrary to the way withLatestFrom
is implemented in other Rx implementations (Rx.js for ex.)
So, as I understand it, you want the second
observable to emit immediately if the first
observable has emitted something... If the first has emitted two values before second emits its first value, would you want the first two values of second to emit immediately when they come in?
It sounds like, you would be better served with a buffer(boundary:)
implementation like the one here on line 143. To use it, you would do something more like second.buffer(boundary: first)
.
With it, every time first
emitted, the observable would emit all the values that second
emitted since the previous emission of first
.
withLatestFrom doesn't wait or fire on changes for "second", it pulls a "current view" of the "second" and if there is no value there it will not do anything (and the stream will be "stuck"). This isn't a bug but planned behavior.
What you want seems more like combineLatest indeed.
Lastly @geoffmacd ... If you want to describe the exact behavior you want, I'm happy to work with you and make a custom operator. It would be best to do this on the Slack channel rather than here though I think...
After looking at other Rx impls, this current implementation is correct it seems https://rxmarbles.com/#withLatestFrom. So I agree now that it shouldn't be changed.
For the record, I made a change in this diff anyway to keep backwards compatibility but add the behavior I expected with a new behavior
similar to other operators. See the tests for the withLatestFrom(..., behavior: .deferred)
functionality I had expected. I'm not going to PR this but wondering if this adds values for other devs. combineLatest
is also not quite what I want.
https://github.com/ReactiveX/RxSwift/compare/main...geoffmacd:RxSwift:withlatestfrom
Here is your operator without having to maintain your own branch of RxSwift:
extension ObservableType {
func withDeferredLatestFrom<Source>(_ second: Source) -> Observable<Source.Element>
where Source: ObservableConvertibleType
{
Observable.create { observer in
var triggered = false
var latest = Source.Element?.none
let lock = NSRecursiveLock()
let source = second.asObservable().subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .next(element):
latest = element
if triggered {
observer.onNext(element)
triggered = false
}
case let .error(error):
observer.onError(error)
case .completed:
break
}
}
let trigger = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next:
if let element = latest {
observer.onNext(element)
triggered = false
} else {
triggered = true
}
case let .error(error):
observer.onError(error)
case .completed:
observer.onCompleted()
}
}
return CompositeDisposable(source, trigger)
}
}
}
Let's think this through though. Are you sure you want the Observable to abruptly stop if the source completes, but the operator is still waiting for a value from second
or should the operator wait and emit a last value from second before completing?
This seems safe to close now...
Short description of the issue:
If you use
withLatestFrom()
and thesecond
observable is delayed, after getting 1 event from thefirst
observable, the result will silently fail and not emit another event until the nextfirst
event. There is an early return that does nothing (no event of any kind) which, to me, was super unexpected and caused a timing-related bug.Expected outcome:
I expect that
withLatestFrom
should return some kind of event if thesecond
observable does not have immediate values upon subscription but has deferred events. It should not just be ignored and returned early as the code does in https://github.com/ReactiveX/RxSwift/blob/bfe5015ba3539ae6040db059f976650ed117aa13/RxSwift/Observables/WithLatestFrom.swift#L76What actually happens:
In my case, we have something like this:
When I changed the logic in
diskSpaceAcceptable
to asynchronously generate the first event (takes a while), I had reasonably expected that thewithLatestFrom
functionality would wait until there was a value before emitting the combined values. This does not happen due to this early return https://github.com/ReactiveX/RxSwift/blob/bfe5015ba3539ae6040db059f976650ed117aa13/RxSwift/Observables/WithLatestFrom.swift#L76. There is noEvent
at all, not anext
/error
orcompletion
. This means that values fromenabled
were being ignored and this code would be stuck here and never generated an event. Only if thediskSpaceAcceptable
had immediate values would this return anything.I do not believe this should silently do nothing and cause race conditions as it did for us. I think
withLatestFrom
should either a) wait until a value is generated fromsecond
OR b) throw an error (I'd prefer A although it will change default behavior). I thinkSelf contained code example that reproduces the issue:
modified example of the
testWithLatestFrom_Simple1
illustrating what I mean by the deferredsecond
observable event:RxSwift/RxCocoa/RxBlocking/RxTest version/commit
version or commit here
Platform/Environment
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
Xcode version: all
Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)