ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.01k stars 392 forks source link

unsubscribe causes heap-use-after-free #436

Open Solonets opened 6 years ago

Solonets commented 6 years ago

this code:

auto timer = rxcpp::observable<>::timer(std::chrono::milliseconds(millis));
auto handle = timer.subscribe_on(rxcpp::observe_on_new_thread())
                     .subscribe([this](auto) { std::cout << "TIMER" << std::endl; });
handle.unsubscribe();

when compiled -fsanitize=address shows a heap-use-after-free error:

READ of size 8 at 0x60d000001d80 thread T1

0 0x10cd96cd7 in __asan_memcpy (libclang_rt.asan_osx_dynamic.dylib:x86_64h+0x50cd7)

1 0x10ba588e1 in std::1::cv_status std::1::condition_variable::wait_until<std::1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > >(std::1::unique_lock&, std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&) chrono:859

2 0x10bb2fca2 in rxcpp::schedulers::new_thread::new_worker::new_worker(rxcpp::composite_subscription, std::1::function<std::1::thread (std::__1::function<void ()>)>&)::'lambda0'()::operator()() const rx-newthread.hpp:112

3 0x10bb2eafc in _ZNSt3128invoke_void_return_wrapperIvE6__callIJRZN5rxcpp10schedulers10new_thread10new_workerC1ENS3_22composite_subscriptionERNS_8functionIFNS_6threadENS8_IFvvEEEEEEEUlvE0EEEvDpOT type_traits:4291

4 0x10bb2e4f8 in std::1::function::func<rxcpp::schedulers::new_thread::new_worker::new_worker(rxcpp::composite_subscription, std::1::function<std::1::thread (std::1::function<void ()>)>&)::'lambda0'(), std::1::allocator<rxcpp::schedulers::new_thread::new_worker::new_worker(rxcpp::composite_subscription, std::1::function<std::1::thread (std::1::function<void ()>)>&)::'lambda0'()>, void ()>::operator()() functional:1552

5 0x10bb0f32f in std::__1::function<void ()>::operator()() const functional:1903

6 0x10bb0ed47 in void std::1::thread_proxy<std::1::tuple<std::1::unique_ptr<std::1::thread_struct, std::1::default_delete >, std::1::function<void ()> > >(void) type_traits:4291

7 0x7fff6fdf46c0 in _pthread_body (libsystem_pthread.dylib:x86_64+0x36c0)

8 0x7fff6fdf456c in _pthread_start (libsystem_pthread.dylib:x86_64+0x356c)

9 0x7fff6fdf3c5c in thread_start (libsystem_pthread.dylib:x86_64+0x2c5c)

0x60d000001d80 is located 0 bytes inside of 144-byte region [0x60d000001d80,0x60d000001e10) freed by thread T0 here:

0 0x10cdaa6ab in wrap__ZdlPv (libclang_rt.asan_osx_dynamic.dylib:x86_64h+0x646ab)

1 0x10bb3f85a in std::1::split_buffer<std::1::pair<rxcpp::schedulers::detail::time_schedulable<std::__1::chrono::time_point<std::1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long>, std::1::allocator<std::1::pair<rxcpp::schedulers::detail::time_schedulable<std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long> >&>::~split_buffer() new:234

2 0x10bb3e5c4 in std::1::split_buffer<std::1::pair<rxcpp::schedulers::detail::time_schedulable<std::__1::chrono::time_point<std::1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long>, std::1::allocator<std::1::pair<rxcpp::schedulers::detail::time_schedulable<std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long> >&>::~split_buffer() __split_buffer:340

3 0x10bb3c34c in void std::1::vector<std::1::pair<rxcpp::schedulers::detail::time_schedulable<std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long>, std::1::allocator<std::1::pair<rxcpp::schedulers::detail::time_schedulable<std::__1::chrono::time_point<std::1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long> > >::push_back_slow_path<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<std::1::chrono::time_point<std::1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long> >(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >, long long>&&) vector:1575

4 0x10bb3af9c in rxcpp::schedulers::detail::schedulable_queue<std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >::push(rxcpp::schedulers::detail::time_schedulable<std::__1::chrono::time_point<std::1::chrono::steady_clock, std::1::chrono::duration<long long, std::1::ratio<1l, 1000000000l> > > >&&) vector:1611

5 0x10bb14e2d in rxcpp::schedulers::new_thread::new_worker::schedule(std::1::chrono::time_point<std::__1::chrono::steady_clock, std::1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const rx-newthread.hpp:135

6 0x10bb1493a in rxcpp::schedulers::new_thread::new_worker::schedule(rxcpp::schedulers::schedulable const&) const rx-newthread.hpp:129

7 0x10bb90328 in std::__1::enable_if<((detail::is_action_function<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >::value) || (is_subscription<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >::value)) && (!(is_schedulable<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >::value)), void>::type rxcpp::schedulers::worker::schedule<void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda'(rxcpp::schedulers::schedulable const&) const&>(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >&&) const rx-scheduler.hpp:801

8 0x10bb9005e in void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda0'()::operator()() const rx-subscribe_on.hpp:119

9 0x10bb90014 in rxcpp::static_subscription<void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda0'()>::unsubscribe() const rx-subscription.hpp:63

10 0x10bb8ff42 in rxcpp::subscription::subscription_state<rxcpp::static_subscription<void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda0'()> >::unsubscribe() rx-subscription.hpp:99

11 0x10baf5235 in rxcpp::subscription::unsubscribe() const rx-subscription.hpp:175

12 0x10bb04f28 in rxcpp::detail::composite_subscription_inner::composite_subscription_state::unsubscribe()::'lambda'(rxcpp::subscription const&)::operator()(rxcpp::subscription const&) const rx-subscription.hpp:283

13 0x10bb04234 in rxcpp::detail::composite_subscription_inner::composite_subscription_state::unsubscribe() algorithm:879

14 0x10bb03177 in rxcpp::detail::composite_subscription_inner::unsubscribe() rx-subscription.hpp:350

15 0x10bb02ee2 in rxcpp::subscription::subscription_state::unsubscribe() rx-subscription.hpp:99

16 0x10baf5235 in rxcpp::subscription::unsubscribe() const rx-subscription.hpp:175

17 0x10bb99d12 in void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda1'()::operator()() const rx-subscribe_on.hpp:123

18 0x10bb99cd4 in rxcpp::static_subscription<void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda1'()>::unsubscribe() const rx-subscription.hpp:63

19 0x10bb99c02 in rxcpp::subscription::subscription_state<rxcpp::static_subscription<void rxcpp::operators::detail::subscribe_on<long, rxcpp::observable<long, rxcpp::dynamic_observable >, rxcpp::observe_on_one_worker>::on_subscribe<rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> > >(rxcpp::subscriber<long, rxcpp::observer<long, rxcpp::detail::stateless_observer_tag, iroha::ordering::OrderingServiceImpl::updateTimer()::$_0, void, void> >) const::'lambda1'()> >::unsubscribe() rx-subscription.hpp:99

20 0x10baf5235 in rxcpp::subscription::unsubscribe() const rx-subscription.hpp:175

21 0x10bb04f28 in rxcpp::detail::composite_subscription_inner::composite_subscription_state::unsubscribe()::'lambda'(rxcpp::subscription const&)::operator()(rxcpp::subscription const&) const rx-subscription.hpp:283

22 0x10bb04234 in rxcpp::detail::composite_subscription_inner::composite_subscription_state::unsubscribe() algorithm:879

23 0x10bb03177 in rxcpp::detail::composite_subscription_inner::unsubscribe() rx-subscription.hpp:350

24 0x10bb02ee2 in rxcpp::subscription::subscription_state::unsubscribe() rx-subscription.hpp:99

25 0x10baf5235 in rxcpp::subscription::unsubscribe() const rx-subscription.hpp:175

It looks like the problem occurs when unsubscribe method try to deletes a thread which is locked

kirkshoop commented 6 years ago

Thank you for the report!

These issues usually take some effort to resolve. I will not have that kind of time for a while. If you or anyone else would like to tack this down further that would be great!

thorstink commented 5 years ago

So I walked into this one as well...

Any ideas on a direction for a fix? Or an okay-ish work around?

juxeii commented 4 years ago

Any news on this? Same problem here.