ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Diamond graphs for observable aren't composable? #484

Closed iam closed 5 years ago

iam commented 5 years ago

When trying to design an algorithm that requires turning a regular rx chain into a diamond (imagine there's no multi-threading necessary here), there seems to be a lack of composability:

     A
   /   \
  B     C
   \   /
     D
     |
     E

(A, B, C, D, E are some linear chains)

The (only working) approach I could figure out to this would look like:

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
A.connect();                    // <<<<< not composable

The problem is that calling connect doesn't compose, so the whole graph can't just be expressed as an observable<typeof(E)>.

What's the best practice to solve this? Should we change some operators to make this easier?


RxJava has autoConnect which rxcpp seems to lack.

ref_count would almost work except it calls connect too soon (when B is subscribed, but before waiting for C).


Possible solutions:

auto A = ... | publish() | auto_connect(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
auto A = ... | publish() | ref_count(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
auto A = <src> | fork(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E.subscribe(...);
/*
  calls D.subscribe which:
  1) calls B.subscribe which calls A.subscribe -> the fork doesn't let it through
  2) calls C.subscribe which calls A.subscribe -> the fork N==2, so call the <src> subscribe

  and then the whole graph is subscribed at <src>
*/

fork(N) waits for N subscribers before subscribing to source. it forwards all the observer callbacks to all its subscribers. unsubscribe when subscriber count reaches 0.

(and this could also avoid all the mutex overhead of publish/ref_count).


What do you think? I'd be happy to write a PR if I knew what was the best way.

This has been bothering me for quite some time in some code I wrote for a project of mine, I ended up returning a pair<observable<T>, connectable_observable<Y>> the design of which "broke" immediately once I tried to integrate this into another larger observable chain.

kirkshoop commented 5 years ago

I like 'Change ref_count to take another observable' the best.

avoid all the mutex overhead of publish/ref_count

To do this write a new subject that does no synchronization and use that with the multicast operator to replace publish. This only works if there is only one thread (or external synchronization) for both the producer and all calls to subscribe - also be careful of reentrancy.

iam commented 5 years ago

Sounds good to me, I uploaded a PR with that approach you suggested works best.