ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Known methods for coalescing / synchronizing value emission #568

Open alexweej opened 2 years ago

alexweej commented 2 years ago

This seems to be a more general Rx issue, but specifically we're looking for solutions in the RxCpp space.

If we have the following:

observable<int> a_obs = /*...*/;
observable<int> a_squared_obs = a_obs.map([](int const a) { return a * a; });
observable<int> x_obs =
    combine_latest(
        a_obs,
        a_squared_obs,
        [](int const a, int const a_squared) { return a + a_squared; }
    );

then we observe x_obs emitting nonsensical intermediate values when a_obs emits values. It is expected to always be a + a^2 but because each of the input observables updates independently, we get transitional values.

Our general problem extends beyond this synchronous value emission to ones using concat_map etc. with async processes and arbitrary streams of data for each input value, but we think this basic problem illustrates something we're missing either in our understanding of Rx or the feature set of RxCpp.

Please let us know if you have some ideas

Thanks!

victimsnino commented 2 years ago

Looks like you need zip operator instead?

alexweej commented 2 years ago

Ah yes thanks, that does actually work well for this special case of having exactly 1 input for every 1 output. I think we have a problem that involves a more complicated case that I'll try to form an example of and get back to you tomorrow.

Thanks again!