ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

I am curious about an optimization problem #523

Open ASaltedFishBoy opened 4 years ago

ASaltedFishBoy commented 4 years ago

I am curious about an optimization problem When a stream is not used directly, will it be compressed to the next stream?

This problem comes from the following code

observable getMessage$() { return m_clientChanged$.get_observable ().scan (observable() , [] (observable sendMessage$ , std::pair<Client, bool> client) { if (client.second) { sendMessage$ = sendMessage$.merge (client.first->getMessage$ ()); } else { sendMessage$ = sendMessage$.filter ([ = ] (MessagePtr message) { return message->getMessage ()->getTo () == client.first->getServer (); }); } return sendMessage$; }).switch_on_next(); }

I worry that this code will cause more and more nesting of merge and filter due to the increase of time, causing performance problems

kirkshoop commented 4 years ago

Yes, this will be a problem. Use a take_until or take_while before the merge to allow another stream to unsubscribe the client instead of adding a filter to remove the clients messages.