ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Spurious race condition (maybe false positive) #532

Open daixtrose opened 4 years ago

daixtrose commented 4 years ago

Maybe a false positive, maybe a layer 8 problem, but here we go:

#include <rxcpp/rx.hpp>
#include <thread>

class X {
    rxcpp::subjects::synchronize<int, rxcpp::observe_on_one_worker> subject_;
    std::thread t_;

public:
    X()
        : subject_{ rxcpp::observe_on_new_thread() }
        , t_{ [this]() {} } {}

    ~X() {
        subject_.get_subscriber(); // .on_next(...);
        t_.join();
    }
};

int main() {
    X x{};
    return 0;
}

My environment: Ubuntu 18.04 LTS

uname -a
Linux ubuntu 4.15.0-91-generic #92-Ubuntu SMP Fri Feb 28 11:09:48 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
/usr/bin/c++ --version
c++ (Ubuntu 9.2.1-17ubuntu1~18.04.1) 9.2.1 20191102

Compile with

/usr/bin/c++ -I/my/path/to/rxcpp/Rx/v2/src -g   -pedantic -fsanitize=undefined -fno-omit-frame-pointer -Wuninitialized -Wunused-parameter -Wno-psabi -fPIC -fsanitize=thread -std=gnu++2a test.cpp
==================
WARNING: ThreadSanitizer: data race (pid=18324)
  Write of size 8 at 0x7b0400000800 by main thread:
    #0 pipe <null> (libtsan.so.0+0x30fba)
    #1 <null> <null> (libubsan.so.1+0x1bb5f)
    #2 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >, std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >*&, std::_Sp_alloc_shared_tag<std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>&&) /usr/include/c++/9/bits/shared_ptr_base.h:679 (a.out+0xd08d1)
    #3 std::__shared_ptr<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(std::_Sp_alloc_shared_tag<std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>&&) /usr/include/c++/9/bits/shared_ptr_base.h:1344 (a.out+0xce1c1)
    #4 std::shared_ptr<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > >::shared_ptr<std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(std::_Sp_alloc_shared_tag<std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>&&) /usr/include/c++/9/bits/shared_ptr.h:359 (a.out+0xcbd15)
    #5 std::shared_ptr<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > > std::allocate_shared<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >, std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(std::allocator<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > > const&, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>&&) /usr/include/c++/9/bits/shared_ptr.h:702 (a.out+0xc80c2)
    #6 std::shared_ptr<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> > > std::make_shared<rxcpp::detail::specific_observer<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>&&) /usr/include/c++/9/bits/shared_ptr.h:718 (a.out+0xc3920)
    #7 std::shared_ptr<rxcpp::detail::virtual_observer<int> > rxcpp::observer<int, void, void, void, void>::make_destination<rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>) /my/path/to/rxcpp/Rx/v2/src/rxcpp/rx-observer.hpp:408 (a.out+0xbe455)
    #8 rxcpp::observer<int, void, void, void, void>::observer<rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >(rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>) /my/path/to/rxcpp/Rx/v2/src/rxcpp/rx-observer.hpp:426 (a.out+0xb7726)
    #9 rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void>::as_dynamic() const /my/path/to/rxcpp/Rx/v2/src/rxcpp/rx-observer.hpp:254 (a.out+0xaf972)
    #10 rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>, void, void, void> >::as_dynamic() const /my/path/to/rxcpp/Rx/v2/src/rxcpp/rx-subscriber.hpp:170 (a.out+0xa87c6)
    #11 rxcpp::subjects::detail::synchronize_observer<int, rxcpp::observe_on_one_worker>::get_subscriber() const /my/path/to/rxcpp/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp:147 (a.out+0x9f79a)
    #12 rxcpp::subjects::synchronize<int, rxcpp::observe_on_one_worker>::get_subscriber() const /my/path/to/rxcpp/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp:180 (a.out+0x95f64)
    #13 X::~X() /path/to/test.cpp:14 (a.out+0x8c20f)
    #14 main /path/to/test.cpp:19 (a.out+0x758ec)

  Previous write of size 8 at 0x7b0400000800 by thread T2:
    #0 pipe <null> (libtsan.so.0+0x30fba)
    #1 <null> <null> (libubsan.so.1+0x1bb5f)
    #2 std::thread::_State_impl<std::thread::_Invoker<std::tuple<X::X()::{lambda()#1}> > >::~_State_impl() /usr/include/c++/9/thread:187 (a.out+0xdc945)
    #3 <null> <null> (libstdc++.so.6+0xd0879)

  Thread T2 (tid=18327, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x2d3be)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xd0b04)
    #2 X::X() /path/to/test.cpp:11 (a.out+0x8bddd)
    #3 main /path/to/test.cpp:19 (a.out+0x758e0)

SUMMARY: ThreadSanitizer: data race (/usr/lib/x86_64-linux-gnu/libtsan.so.0+0x30fba) in pipe
==================
ThreadSanitizer: reported 1 warnings

Any ideas?

daixtrose commented 4 years ago

Note that reordering the instructions in the destructor resolves the issue 😱

    ~X() {
        t_.join();
        subject_.get_subscriber(); // .on_next(...);
    }
daixtrose commented 4 years ago

Confirmed on other platforms and clang.

daixtrose commented 4 years ago

@lebdron any idea what might cause this issue?