ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.63k stars 3k forks source link

Interaction of withLatestFrom and combineLatest changes values depending on the subscription order of independent subscribers with shareReplay(1). #6608

Open begrs opened 2 years ago

begrs commented 2 years ago

Discussed in https://github.com/ReactiveX/rxjs/discussions/6605

Originally posted by **begrs** September 20, 2021 Hi, a sample code that produces my issue is here: https://stackblitz.com/edit/rxjs-mvugpe?file=index.ts ![image](https://user-images.githubusercontent.com/10938362/134024533-624be836-de4a-4297-b544-d80bb176a84a.png) I have two observables as sources input1$ and input2$. Now when one of them is updated, some computation is triggered in the observable calculated$. The calculated observable has two subscribers to its value, one of which is combined$. Combined$ also requires the input1$ and input2$ values for its computation (here just a console log). ![image](https://user-images.githubusercontent.com/10938362/134024592-37cbae6e-424c-4a36-85eb-9a972a3e6537.png) Now one of the inputs is changed to 5. This causes calculated$ to be updated, which in turn updates secondSub and combined$. What is very weird, is that in combined$ **the OLD value of the changed input** is used, not the one used for the calculation in calculated$. However, if I move secondSub (line 12) after combined$, or remove it alltogether, the console long in combined$ actually produces the desired input with the CURRENT value of the changed input. ![image](https://user-images.githubusercontent.com/10938362/134024657-af5afd03-7ffd-4fb0-a657-f57ff9cf0222.png) ![image](https://user-images.githubusercontent.com/10938362/134024712-d2361ef4-b1bc-4c46-bcb6-a2da12bd5706.png) Removing the shareReplay(1) also fixes the issue - but that was kind of expected.

After the discussion I know understand the behavior, but it is a dangerous issue for me as it produces silent errors.

Reasoning to open an issue:

For me the correct way of handling withLatestFrom and combineLatest would be to first subscribe to combineLatest observables, since even in other use cases, the combineLatest is kind of a combination of multiple observables as a source and should happen before any piped operator. In some of the use cases it does not make any difference, but would in my opinion be more consistent with how users think it will work. The current behavior is as you said not obvious and can cause a lot of silent errors in a lot of places. In my case the only reason I found it at all was a division by zero due to the old value. The secondSub in my case is located in another typescript file and input1, input2 and calculated are part of a kind of module wide "store" to which all components have access and share their values by. I know RX.js sometimes behaves that way, but it kind of breaks any separation of concerns approaches when the order of independent subscribers (secondSub and combined have nothing to do with each and have no direct interaction, they only use the same source value) causes different values in the value streams.

benlesh commented 2 years ago

This seems to be an misunderstanding about source.pipe(withLatestFrom(a, b) and synchronous observables. The source is what dictates when it will emit, and the source is the first thing it subscribes to. Therefore, if source is derived from a or b synchronously, then by the time the result of withLatestFrom emits, it hasn't seen the new value of a yet. This is all working as expected.

To get what you want, you'd want to schedule the emissions from source, like source.pipe(observeOn(asapScheduler), withLatestFrom(a, b)), or something to that effect.

Either way, if you find yourself doing a lot of synchronous combining like this, it might be worth considering that it all could be done in a single step?

Thank you for your issue.

begrs commented 2 years ago

Hi, thank you for your time. The reasoning why it behaves that way was already explained by voliva in the discussion. Following your assumption "Therefore, if source is derived from a or b synchronously, then by the time the result of withLatestFrom emits, it hasn't seen the new value of a yet." would mean that no matter what any of the child observables do, the source will always have a newer value than a, or b. This would be consistent behavior and would not be an issue for me. But that is NOT the case. In the example code if I move secondSub behind the declaration of combined$ it changes the output. With that edit it will actually receive the updated value and not the old one as expected following your comment. Again, the steps why it happens that way have been explained by volvia, but the order of two otherwise independent observables should not influence the outcome in such a significant way.

Regarding "Either way, if you find yourself doing a lot of synchronous combining like this, it might be worth considering that it all could be done in a single step?" -> I would love to do that, but the observables are used in many parts of the application and as source for even more calculations. The best solution in that regard was proposed by gneu77 - packing the input parameters of the calculation with each emitted value. I will use that, but again, this is not why I opened the issue.

Kind regards

benlesh commented 2 years ago

Okay, so looking through this, I now see what you're saying. It seems like the root of the issue is that combineLatest subscribes to the source observable first, and withLatestFrom subscribes to the source observable last. Looking through our core semantics guidelines, we don't really have a defined behavior for what is right there. The closest thing is that we always subscribe to notifiers first. At best, this could be interpreted as okay for combineLatest, where all observables "notify" of a new emission, in which case withLatestFrom is incorrect, as it's only "notifier" would be the source which is subscribed to last. At worst, this semantic is unrelated and we need to come up with something else.

I think, intuitively, withLatestFrom should subscribe to the source first. However this would result in very strange behaviors with synchronous observables. The best we could do to accommodate this is subscribe to the everything, source first, then emit the first value (if there is one) after all subscriptions are started.

Thank you for diligently pleading your case, @begrs, I'll reopen this one and have a look. We might not be able to change this behavior until a major release, though.