ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

subjects::synchronize on_completed deadlock #490

Open lebdron opened 5 years ago

lebdron commented 5 years ago
rxcpp::composite_subscription lifetime;
rxcpp::subjects::synchronize<int, rxcpp::observe_on_one_worker> sub(
    rxcpp::observe_on_new_thread(), lifetime);
std::thread t([&sub] {
  std::this_thread::sleep_for(std::chrono::seconds(2));
  printf("OnCompleted\n");
  sub.get_subscriber().on_completed();
});
printf("Subscribe\n");
sub.get_observable().as_blocking().subscribe();
t.join();

lifetime.unsubscribe();

The code blocks due to this unsubscribe call, which is done before new_thread has time to execute on_completed schedulable: https://github.com/ReactiveX/RxCpp/blob/7c79c18ab252eb37bcf122c75070f8d557868c08/Rx/v2/src/rxcpp/rx-subscriber.hpp#L90 If a breakpoint is set on this line, then another thread has its time to execute on_completed and unblock main thread.

Edit: sleep_for was added to ensure that subscription is done before on_completed call, and it can be verified on execution.

iam commented 5 years ago

Which line does it block on?

Do you have the backtrace showing where it's deadlocking (for the 2 respective threads), and/or a simpler example?

lebdron commented 5 years ago

It blocks on sub.get_observable().as_blocking().subscribe();

Stacktrace: https://gist.github.com/lebdron/041814233e3a7877217887b5715856eb

I am not sure if I can make a simpler example since it requires multiple threads to trigger the lock.

The main point is that when the scheduler (which is in another thread, produced by rxcpp::observe_on_new_thread()) gets to pop the on_completed item from the queue, the caller thread (std::thread t) already triggered unsubscribe in completeddetacher, and therefore the scheduler does not execute the schedulable.