ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

One sequential processor - multiple streams scenario #553

Open sergrt opened 3 years ago

sergrt commented 3 years ago

Hello.

Is there any way to design following pattern wit RxCpp without using locks? The scenario is:

1) Component A creates rxcpp subject, starts new OS thread and produce values stream, by calling sentences_stream_.get_subscriber().on_next(data); 2) Component B (created on the same thread, as component A) subscribes on the subject of component A (sentences_stream_). As soon as data is being produced by another thread, this data is being processed on that thread 3) Component C wants to supply component B with additional data of the same type as component A do. The trick is, that values from component A and component C should be processed subsequently, without overlapping in time. Imagine, for example, that component B writes data char by char to the console. So to prevent mess, data from A and C should be processed either in one thread, or have some sort of scheduler. I found solution like this - store observables in the container (vector) and iterate with merge. So values from A and C are processed in one new thread:

rxcpp::observable<>::iterate(observables_).merge(rxcpp::observe_on_new_thread()).subscribe(
// ...
);

But in this case I am missing another important feature - component B cannot unsubscribe from component C. And by some observations - it is not the way it should be done, because storing observables in a vector cannot solve problem of unsubscribing - it seems that it is not possible to erase particular observable based on comparison (at least in my case calling "find" with the observable, supplied earlier, returns nothing).

So is it possible to make all this "one sequential processor - multiple streams" pattern without using locks?