ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.06k stars 396 forks source link

Question about the coordinator in sources and why it causes segfault #579

Open wuyuanyi135 opened 2 years ago

wuyuanyi135 commented 2 years ago

I have the following code that crashes. It is a RX subscription that is supposed to iterate data sequentially (Here it is not substantial but if I introduce delay after just it will be the case).

I understand how a scheduler (coordinator) works for a deferred operation (like delay) but I don't know what is the point to supply the coordinator to the sources (such as just or iterate). I simply put the same coordinator to every rx function that takes the coordinator but in this case apparently it fails. I found that if I remove the coordinator in just it will run correctly.

I wonder why shouldn't I use the coordinator in this case and when should I use it?

  schedulers::run_loop rl;
  identity_one_worker c(schedulers::make_run_loop(rl));
  sources::iterate(data, c)
      .concat_map(
          [&](int v) {
            return sources::just(v, c);
          },
          c)
      .subscribe([](int v) {
        std::cout << "Next " << v << "\n";
      });

  while (true) {
    rl.dispatch();
  }
wuyuanyi135 commented 2 years ago

from my observation, the following case will also fail: Does it mean I should not use coordinator in the source anywhere?

  std::vector<int> data{1, 2, 3, 4, 5};
  schedulers::run_loop rl;
  identity_one_worker c(schedulers::make_run_loop(rl));
  sources::iterate(data, c)
      .tap([](auto _) {std::cout << "before delay\n";})
      .delay(std::chrono::milliseconds(500))
      .tap([](auto _) {std::cout << "after delay\n";})
      .subscribe([](int v) {
        std::cout << "Next " << v << "\n";
      });
wuyuanyi135 commented 2 years ago

I started investigating this issue. I found that the crash was caused by the following line. https://github.com/ReactiveX/RxCpp/blob/761b932a80e2be6e2b62d232e754bd96fc448946/Rx/v2/src/rxcpp/rx-scheduler.hpp#L349

For some reason, the weak_ptr returned NULL, presumably the underlying shared pointer has been released somehow.

wuyuanyi135 commented 2 years ago

One possibility is that the sources (like just or iterate) uses identity_immediate coordinator by default: https://github.com/ReactiveX/RxCpp/blob/761b932a80e2be6e2b62d232e754bd96fc448946/Rx/v2/src/rxcpp/sources/rx-iterate.hpp#L277 This makes more sense to me as emitting values does not sounds like asynchronous operation (except interval). Passing the runloop coordinator seems not a good choice but why will it crash?

kirkshoop commented 2 years ago

Can you add the stack for the crash?

wuyuanyi135 commented 2 years ago

@kirkshoop Sorry for the late response. Here is the stack: (I am using master branch of rxcpp)

rxcpp::schedulers::worker::schedule_rebind<…>(const rxcpp::schedulers::schedulable &) const rx-scheduler.hpp:815
rxcpp::schedulers::worker::schedule rx-scheduler.hpp:260
rxcpp::schedulers::schedulable::schedule rx-scheduler.hpp:621
rxcpp::schedulers::detail::action_tailrecurser::operator() rx-scheduler.hpp:715
std::__invoke_impl<…> invoke.h:61
std::__invoke_r<…> invoke.h:111
std::_Function_handler::_M_invoke(const std::_Any_data &, const rxcpp::schedulers::schedulable &, const rxcpp::schedulers::recurse &) std_function.h:291
std::function::operator()(const rxcpp::schedulers::schedulable &, const rxcpp::schedulers::recurse &) const std_function.h:560
rxcpp::schedulers::detail::action_type::operator() rx-scheduler.hpp:679
rxcpp::schedulers::action::operator() rx-scheduler.hpp:727
rxcpp::schedulers::schedulable::operator() rx-scheduler.hpp:645
rxcpp::schedulers::run_loop::dispatch rx-runloop.hpp:192
main emitting_a_list_simultaneously_or_sequentially.cpp:37
__tmainCRTStartup 0x00007ff72cc9143f
mainCRTStartup 0x00007ff72cc91146
BaseThreadInitThunk 0x00007ff8045554e0
RtlUserThreadStart 0x00007ff8049a485b
<unknown> 0x0000000000000000