ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Terminate called without an active exception using with_latest_from() #459

Closed thorstink closed 6 years ago

thorstink commented 6 years ago

Hi again :)

I am afraid I stumbled upon a rather complex bug (or I just do not know what I am doing). I will do my best to explain and document this as well as I can, however, if you need anything specific, please let me know. I think I have checked all the obvious source for this error, no obvious unjoined/detached threads, examined the backtrace after a crash (which is definitely outside my comfort zone and or skill set). As far as I understand it, it seems to with_latest_from().

I can not create a minimum example that reproduces the bug at the moment, so I hope I am clear enough through my writing and documentation.The bug however, is persistent, occurs always after a random time.

The application fuses multiple sources of information to estimate the motion of a robot and also optimizes the motion-model while doing so. The application eventually crashes with the note: terminate called without an active exception. Sometimes it happens within 5 minutes, sometimes it takes up to 2 hours for the bug to appear. There is a shared library that defines a class which has a cpp rxcpp::subjects::subject<msg> public member variable. In the main application there is a class-instantiation of this class, auto driver = driver() and then the observable is exposed though, e.g.: auto info_source = driver.subject_.get_observable().

Somewhere further in the main application, the info_source is merged with the output of a complex computation, in the subscription to the merged stream the info it published on a middleware-publisher (in this case ROS's middleware implementation), so in pseudo-pseudo-cpp:

auto driver1 = driver1();
auto info_source = driver.subject_.get_observable();
auto driver2 = driver2();
auto linear_velocity = driver.subject_.get_observable();
...
  auto robot_state = 
    linear_velocity // a source of velocity information
    .debounce(std::chrono::milliseconds(100)) // filter to once every so many milliseconds.
    .observe_on(rxodom) // computation is done on its own thread
    .map([&inertial_odom](const auto& _linear_velocity) 
    {
      // this returns an Eigen::Matrix<double, 16,1>-type variable
      return inertial_odom.cmd(linear_velocity); 
    })
    .publish()
    .ref_count();
...
  // Add the IMU-measurement to the odometry object for integration
  imu
    .with_latest_from(
      [&inertial_odom](const auto& _imu, const auto& _state_vector)
      {
        // pass this information to a thread-safe class
        inertial_odom.imu(_imu.acc_, _imu.gyr_, _imu.t_);
        // transform information bases on the two inputs:
        return tf(_imu, _state_vector);
      }, robot_state)
    .subscribe(
      [&clean_pub](const auto& _msg) { clean_pub.publish(_msg); },  
      [](){ printf("closing clean publisher\n");
      }
    );
...
loop.run() 

The backtrace, which shows more stuff, because the actual application has a bit more stuff going on..

thread id: 140737055995648, return state, count: 9442 // <-- last cout from inertial_odom.cmd(linear_velocity)
terminate called without an active exception

Thread 12 "robot_applicati" received signal SIGABRT, Aborted.
[Switching to Thread 0x7fffd67fc700 (LWP 23862)]
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
51  ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) backtrace n
#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1  0x00007ffff5826801 in __GI_abort () at abort.c:79
#2  0x00007ffff62198fb in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x00007ffff621fd3a in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#4  0x00007ffff621fd95 in std::terminate() () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#5  0x00005555555fec95 in rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> >::get (this=0x555555b3bb70) at /usr/local/include/rxcpp/rx-util.hpp:633
#6  0x00005555555fa85f in rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> >::maybe (this=0x7fffd67f3d50, other=...) at /usr/local/include/rxcpp/rx-util.hpp:565
#7  0x00005555555f5239 in std::_Head_base<1ul, rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> >, false>::_Head_base (this=0x7fffd67f3d50) at /usr/include/c++/7/tuple:128
#8  0x00005555555f525f in std::_Tuple_impl<1ul, rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> > >::_Tuple_impl (this=0x7fffd67f3d50) at /usr/include/c++/7/tuple:368
#9  0x00005555555f52ac in std::_Tuple_impl<0ul, rxcpp::util::detail::maybe<imu::msg>, rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> > >::_Tuple_impl (this=0x7fffd67f3d50)
    at /usr/include/c++/7/tuple:220
#10 0x00005555555f5313 in std::tuple<rxcpp::util::detail::maybe<imu::msg>, rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> > >::tuple (this=0x7fffd67f3d50)
    at /usr/include/c++/7/tuple:985
#11 0x00005555555f5358 in rxcpp::util::surely<rxcpp::util::detail::maybe<imu::msg>, rxcpp::util::detail::maybe<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > (tpl=std::tuple containing = {...})
    at /usr/local/include/rxcpp/rx-util.hpp:689
#12 0x00005555555826c7 in rxcpp::operators::detail::with_latest_from<rxcpp::identity_one_worker, main(int, char**)::<lambda(auto:24, auto:25)>, rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > > >::<lambda(source_value_type)>::operator()(rxcpp::operators::detail::with_latest_from<rxcpp::identity_one_worker, main(int, char**)::<lambda(auto:24, auto:25)>, rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > > >::<lambda(source_value_type)>::source_value_type) const (__closure=0x555555a995a8, st=...)
    at /usr/local/include/rxcpp/operators/rx-with_latest_from.hpp:170
#13 0x00005555555b2315 in rxcpp::observer<imu::msg, rxcpp::detail::stateless_observer_tag, rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::subscribe_one(std::shared_ptr<State>) const [with int Index = 0; State = rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::on_subscribe(Subscriber) const [with Subscriber = rxcpp::subscriber<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::observer<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::detail::stateless_observer_tag, main(int, char**)::<lambda(auto:26)>, main(int, char**)::<lambda(std::__exception_ptr::exception_ptr)>, main(int, char**)::<lambda()> > >; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::with_latest_from_state_type; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::<lambda(source_value_type)>, rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::subscribe_one(std::shared_ptr<State>) const [with int Index = 0; State = rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::on_subscribe(Subscriber) const [with Subscriber = rxcpp::subscriber<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::observer<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::detail::stateless_observer_tag, main(int, char**)::<lambda(auto:26)>, main(int, char**)::<lambda(std::__exception_ptr::exception_ptr)>, main(int, char**)::<lambda()> > >; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::with_latest_from_state_type; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observab---Type <return> to continue, or q <return> to quit---
le<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::<lambda(std::__exception_ptr::exception_ptr)>, rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::subscribe_one(std::shared_ptr<State>) const [with int Index = 0; State = rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::on_subscribe(Subscriber) const [with Subscriber = rxcpp::subscriber<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::observer<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::detail::stateless_observer_tag, main(int, char**)::<lambda(auto:26)>, main(int, char**)::<lambda(std::__exception_ptr::exception_ptr)>, main(int, char**)::<lambda()> > >; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::with_latest_from_state_type; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::<lambda()> >::on_next(imu::msg &&) const (this=0x555555a995a8, t=...) at /usr/local/include/rxcpp/rx-observer.hpp:333
#14 0x00005555555b018c in rxcpp::detail::specific_observer<imu::msg, rxcpp::observer<imu::msg, rxcpp::detail::stateless_observer_tag, rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::subscribe_one(std::shared_ptr<State>) const [with int Index = 0; State = rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::on_subscribe(Subscriber) const [with Subscriber = rxcpp::subscriber<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::observer<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::detail::stateless_observer_tag, main(int, char**)::<lambda(auto:26)>, main(int, char**)::<lambda(std::__exception_ptr::exception_ptr)>, main(int, char**)::<lambda()> > >; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::with_latest_from_state_type; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::<lambda(source_value_type)>, rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::subscribe_one(std::shared_ptr<State>) const [with int Index = 0; State = rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::on_subscribe(Subscriber) const [with Subscriber = rxcpp::subscriber<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::observer<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::detail::stateless_observer_tag, main(int, char**)::<lambda(auto:26)>, main(int, char**)::<lambda(std::__exception_ptr::exception_ptr)>, main(int, char**)::<lambda()> > >; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::with_latest_from_state_type; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::<lambda(std::__exception_ptr::exception_ptr)>, rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::subscribe_one(std::shared_ptr<State>) const---Type <return> to continue, or q <return> to quit---
 [with int Index = 0; State = rxcpp::operators::detail::with_latest_from<Coordination, Selector, ObservableN>::on_subscribe(Subscriber) const [with Subscriber = rxcpp::subscriber<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::observer<sensor_msgs::Imu_<std::allocator<void> >, rxcpp::detail::stateless_observer_tag, main(int, char**)::<lambda(auto:26)>, main(int, char**)::<lambda(std::__exception_ptr::exception_ptr)>, main(int, char**)::<lambda()> > >; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<long int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::with_latest_from_state_type; Coordination = rxcpp::identity_one_worker; Selector = main(int, char**)::<lambda(auto:24, auto:25)>; ObservableN = {rxcpp::observable<imu::msg, rxcpp::operators::detail::lift_operator<imu::msg, rxcpp::dynamic_observable<imu::msg>, rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker> > >, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::ref_count<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::connectable_observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::multicast<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::observable<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<Eigen::Matrix<double, 16, 1, 0, 16, 1>, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::operators::detail::lift_operator<double, rxcpp::dynamic_observable<dsp::motorspeeds>, rxcpp::operators::detail::map<dsp::motorspeeds, main(int, char**)::<lambda(auto:22)> > >, rxcpp::operators::detail::debounce<double, std::chrono::duration<lon
g int, std::ratio<1, 1000> >, rxcpp::identity_one_worker> >, rxcpp::operators::detail::observe_on<double, rxcpp::observe_on_one_worker> >, rxcpp::operators::detail::map<double, main(int, char**)::<lambda(auto:23)> > > >, rxcpp::subjects::subject<Eigen::Matrix<double, 16, 1, 0, 16, 1> > > > > >}]::<lambda()> > >::on_next(imu::msg &&) const (this=0x555555a99590, t=...)
    at /usr/local/include/rxcpp/rx-observer.hpp:372
#15 0x0000555555651d43 in rxcpp::observer<imu::msg, void, void, void, void>::on_next<imu::msg> (this=0x555555b1afb0, v=...) at /usr/local/include/rxcpp/rx-observer.hpp:439
#16 0x0000555555651408 in rxcpp::subscriber<imu::msg, rxcpp::observer<imu::msg, void, void, void, void> >::nextdetacher::operator()<imu::msg> (this=0x7fffd67f42a0, u=...)
    at /usr/local/include/rxcpp/rx-subscriber.hpp:54
#17 0x0000555555650979 in rxcpp::subscriber<imu::msg, rxcpp::observer<imu::msg, void, void, void, void> >::on_next<imu::msg const&> (this=0x555555b1af90, v=...)
    at /usr/local/include/rxcpp/rx-subscriber.hpp:181
#18 0x000055555565007b in rxcpp::notifications::notification<imu::msg>::on_next_notification::accept (this=0x7fffdc001100, o=...) at /usr/local/include/rxcpp/rx-notification.hpp:143
#19 0x0000555555610212 in rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker>::observe_on_observer<rxcpp::subscriber<imu::msg, rxcpp::observer<imu::msg, void, void, void, void> > >::observe_on_state::ensure_processing(std::unique_lock<std::mutex>&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const (__closure=0x7fffdc01a520, self=...) at /usr/local/include/rxcpp/operators/rx-observe_on.hpp:149
#20 0x0000555555624fd8 in std::_Function_handler<void (rxcpp::schedulers::schedulable const&), rxcpp::operators::detail::observe_on<imu::msg, rxcpp::observe_on_one_worker>::observe_on_observer<rxcpp::subscriber<imu::msg, rxcpp::observer<imu::msg, void, void, void, void> > >::observe_on_state::ensure_processing(std::unique_lock<std::mutex>&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>::_M_invoke(std::_Any_data const&, rxcpp::schedulers::schedulable const&) (__functor=..., __args#0=...) at /usr/include/c++/7/bits/std_function.h:316
#21 0x00005555555c48a1 in std::function<void (rxcpp::schedulers::schedulable const&)>::operator()(rxcpp::schedulers::schedulable const&) const (this=0x7fffdc01a6c0, __args#0=...)
    at /usr/include/c++/7/bits/std_function.h:706
#22 0x00005555555bd6ed in rxcpp::schedulers::detail::action_tailrecurser::operator() (this=0x7fffdc01a6b0, s=..., r=...) at /usr/local/include/rxcpp/rx-scheduler.hpp:712
#23 0x000055555560c788 in std::_Function_handler<void (rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&), rxcpp::schedulers::detail::action_tailrecurser>::_M_invoke(std::_Any_data const&, rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&) (__functor=..., __args#0=..., __args#1=...) at /usr/include/c++/7/bits/std_function.h:316
#24 0x00005555555c47d1 in std::function<void (rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)>::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&) const (this=0x55555688da00, __args#0=..., __args#1=...) at /usr/include/c++/7/bits/std_function.h:706
#25 0x00005555555bd5d7 in rxcpp::schedulers::detail::action_type::operator() (this=0x55555688d9f0, s=..., r=...) at /usr/local/include/rxcpp/rx-scheduler.hpp:679
#26 0x00005555555bd7f0 in rxcpp::schedulers::action::operator() (this=0x7fffd67f45f0, s=..., r=...) at /usr/local/include/rxcpp/rx-scheduler.hpp:727
#27 0x00005555555bd4d1 in rxcpp::schedulers::schedulable::operator() (this=0x7fffd67f45a0, r=...) at /usr/local/include/rxcpp/rx-scheduler.hpp:645
#28 0x00005555555bf065 in rxcpp::schedulers::new_thread::new_worker::new_worker(rxcpp::composite_subscription, std::function<std::thread (std::function<void ()>)>&)::{lambda()#2}::operator()() const (
    __closure=0x555555a95270) at /usr/local/include/rxcpp/schedulers/rx-newthread.hpp:119
#29 0x00005555555cbb8d in std::_Function_handler<void (), rxcpp::schedulers::new_thread::new_worker::new_worker(rxcpp::composite_subscription, std::function<std::thread (std::function<void ()>)>&)::{lambda()#2}>::_M_invoke(std::_Any_data const&) (__functor=...) at /usr/include/c++/7/bits/std_function.h:316
#30 0x00005555555d7204 in std::function<void ()>::operator()() const (this=0x555555a95228) at /usr/include/c++/7/bits/std_function.h:706
#31 0x00005555555cbd55 in std::__invoke_impl<void, std::function<void ()>>(std::__invoke_other, std::function<void ()>&&) (__f=...) at /usr/include/c++/7/bits/invoke.h:60
#32 0x00005555555c5635 in std::__invoke<std::function<void ()>>(std::function<void ()>&&) (__fn=...) at /usr/include/c++/7/bits/invoke.h:95
#33 0x0000555555643bd8 in std::thread::_Invoker<std::tuple<std::function<void ()> > >::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x555555a95228) at /usr/include/c++/7/thread:234
#34 0x00005555556425f7 in std::thread::_Invoker<std::tuple<std::function<void ()> > >::operator()() (this=0x555555a95228) at /usr/include/c++/7/thread:243
#35 0x0000555555640b8a in std::thread::_State_impl<std::thread::_Invoker<std::tuple<std::function<void ()> > > >::_M_run() (this=0x555555a95220) at /usr/include/c++/7/thread:186
#36 0x00007ffff624b733 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#37 0x00007ffff6d1c6db in start_thread (arg=0x7fffd67fc700) at pthread_create.c:463
#38 0x00007ffff590788f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Using gcc (Ubuntu 7.3.0-16ubuntu3) 7.3.0 and set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_FLAGS "-g")

thorstink commented 6 years ago

If I give combine_latest_with() an own thread, it does not seem to occur. (it ran without crashing for over 12 hours now..)

Maybe I also should have emphasized that I am putting a semi-random sleep in inertial_odom.cmd(linear_velocity); (that often exceeds the 100ms) to sort of simulate the case when a simulation takes too long. This seemed to improve the reproducibility of the bug.

So I guess the issue might lie in when there are too many tasks on a stack or something; I have no idea.

kirkshoop commented 6 years ago

Hi! Thank you for all the detail. Your solution is correct. Operators that combine multiple inputs from different threads must specify a thread-safe coordinator/scheduler. Specifing a new thread coordinator to with_latest_from would fix the issue that you documented. It does not have to be a new thread though. There are ways to reuse threads across multiple operators.

thorstink commented 6 years ago

Yes indeed, I used some rx-thread that was already there. Issue can be closed as far as I am concerned.

There is still much to learn for me in the realms of this frp, rx and applying it to mobile robots, so much fun though :-)