==================
0x70000ee59000 START
==================
0x70000eedc000 START
1
0x70000ee59000 END
==================
2
0x70000eedc000 END
==================
==================
0x70000ef5f000 START
==================
0x70000efe2000 START
1
0x70000efe2000 END
==================
2
0x70000ef5f000 END
==================
==================
0x70000ee59000 START
==================
0x70000eedc000 START
1
0x70000ee59000 END
==================
2
0x70000eedc000 END
==================
==================
0x70000ef5f000 START
==================
0x70000efe2000 START
1
0x70000ef5f000 END
==================
2
0x70000efe2000 END
==================
==================
0x70000ee59000 START
==================
0x70000eedc000 START
1
0x70000ee59000 END
==================
2
0x70000eedc000 END
As you can see, it is mixed, but ReactiveX requires that any observable should be serialized.
In case of using any valid scheduler in merge like observe_on_new_thread output is valid:
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
Expected: at least default behavior of any operator should be thread-safe...
In my understanding, best default scheduler for such an "multhithreaded" opertators can be "serialize_immediate" (not exist, but actually just emit emissions under mutex to provide exclusive access to subscriber and guarantee that only one observable pushes item at the same time). Tested locally: also provides valid output
BTW: it is how i've implemented merge in ReactivePlusPlus: each callback to subscriber of merge just called under mutex. As a result there is no way to obtain "mixed" log. @kirkshoop, what do you think ?
Hi everyone and hi @kirkshoop!
Most of the operators in rxcpp which requires schedulers has such an fallback scheduler:
For example, merge operator also has it as default. BUT it doesn't provide ANY synchronization/serialization in case of multithreaded application.
Example:
Possible output:
As you can see, it is mixed, but ReactiveX requires that any observable should be serialized.
In case of using any valid scheduler in merge like
observe_on_new_thread
output is valid:Expected: at least default behavior of any operator should be thread-safe...
In my understanding, best default scheduler for such an "multhithreaded" opertators can be "serialize_immediate" (not exist, but actually just emit emissions under mutex to provide exclusive access to subscriber and guarantee that only one observable pushes item at the same time). Tested locally: also provides valid output
BTW: it is how i've implemented merge in ReactivePlusPlus: each callback to subscriber of merge just called under mutex. As a result there is no way to obtain "mixed" log. @kirkshoop, what do you think ?