FairRootGroup / FairMQ

C++ Message Queuing Library and Framework
GNU Lesser General Public License v3.0
87 stars 34 forks source link

StateMachine: Guard access to subscription containers #489

Closed rbx closed 1 year ago

rbx commented 1 year ago

Although Signals2 is thread-safe, the container we put them into are not and should be locked.


Checklist:

Summary by CodeRabbit

coderabbitai[bot] commented 1 year ago

Walkthrough

The changes introduced in the codebase primarily focus on enhancing thread safety in the StateMachine.cxx file of the fairmq module. A new mutex fSubscriptionsMtx has been added to the Machine_ struct, and locking mechanisms have been implemented in various functions. Additionally, a new test case SubscriptionThreadSafety has been added in the plugin_services/_control.cxx file to test the subscription functionality across multiple threads.

Changes

File Path Change Summary
fairmq/StateMachine.cxx A new mutex fSubscriptionsMtx has been added to the Machine_ struct. Locking mechanisms have been implemented in the SubscribeToStateChange, UnsubscribeFromStateChange, SubscribeToNewTransition, and UnsubscribeFromNewTransition functions to ensure thread safety.
test/plugin_services/.../_control.cxx New include directives for <array>, <fairmq/Tools.h>, <memory>, and <string> have been added. A new test case SubscriptionThreadSafety has been introduced to test the subscription functionality across multiple threads.

Tips ### Chat with CodeRabbit Bot (`@coderabbitai`) - If you reply to a *review comment* from CodeRabbit, the bot will automatically respond. - To engage with CodeRabbit bot directly around the specific lines of code in the PR, mention @coderabbitai in your review comment - Note: Review comments are made on code diffs or files, not on the PR overview. - Add `@coderabbitai ignore` anywhere in the PR description to prevent this PR from being reviewed. ### CodeRabbit Commands (invoked as PR comments) - `@coderabbitai pause` to pause the reviews on a PR. - `@coderabbitai resume` to resume the paused reviews. - `@coderabbitai review` to trigger a review. This is useful when automatic reviews are disabled for the repository. - `@coderabbitai help` to get help. - `@coderabbitai resolve` to resolve all the CodeRabbit review comments. Note: For conversation with the bot, please use the review comments on code diffs or files. ### CodeRabbit Configration File (`.coderabbit.yaml`) - You can programmatically configure CodeRabbit by adding a `.coderabbit.yaml` file to the root of your repository. - The JSON schema for the configuration file is available [here](https://coderabbit.ai/integrations/coderabbit-overrides.v2.json). - If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: `# yaml-language-server: $schema=https://coderabbit.ai/integrations/coderabbit-overrides.v2.json`
rbx commented 1 year ago

@coderabbitai review

dennisklein commented 1 year ago

When I remove the mutex lock from UnsubscribeFromStateChanges(), I consistently get various segfaults from the std::_Hashtable similar to what @davidrohr reported in O2-4340, e.g.

❯ build/test/testsuite_PluginServices "--gtest_filter=PluginServices.SubscriptionThreadSafety"
Note: Google Test filter = PluginServices.SubscriptionThreadSafety
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from PluginServices
[ RUN      ] PluginServices.SubscriptionThreadSafety
[STATE] Starting FairMQ state machine --> IDLE
[1]    508754 segmentation fault (core dumped)  build/test/testsuite_PluginServices
❯ coredumpctl gdb
Journal file /var/log/journal/5aeb9cde3e564c17ac334e8b781c1416/user-1000@00060550d99625c8-7688b329bcc30c6e.journal~ is truncated, ignoring file.
           PID: 508754 (testsuite_Plugi)
           UID: 1000 (dklein)
           GID: 1000 (dklein)
        Signal: 11 (SEGV)
     Timestamp: Fri 2023-11-10 12:55:56 CET (3s ago)
  Command Line: build/test/testsuite_PluginServices --gtest_filter=PluginServices.SubscriptionThreadSafety
    Executable: /home/dklein/projects/FairMQ/build/test/testsuite_PluginServices
 Control Group: /user.slice/user-1000.slice/user@1000.service/app.slice/tmux.service
          Unit: user@1000.service
     User Unit: tmux.service
         Slice: user-1000.slice
     Owner UID: 1000 (dklein)
       Boot ID: 3c9360435074411d884a98f73eae87c7
    Machine ID: 5aeb9cde3e564c17ac334e8b781c1416
      Hostname: delta
       Storage: /var/lib/systemd/coredump/core.testsuite_Plugi.1000.3c9360435074411d884a98f73eae87c7.508754.1699617356000000.zst (present)
  Size on Disk: 302.5K
       Message: Process 508754 (testsuite_Plugi) of user 1000 dumped core.

                Module libpcre2-8.so.0 from rpm pcre2-10.42-1.fc38.1.x86_64
                Module libz.so.1 from rpm zlib-1.2.13-3.fc38.x86_64
                Module libselinux.so.1 from rpm libselinux-3.5-1.fc38.x86_64
                Module libcrypto.so.3 from rpm openssl-3.0.9-2.fc38.x86_64
                Module libkeyutils.so.1 from rpm keyutils-1.6.1-6.fc38.x86_64
                Module libkrb5support.so.0 from rpm krb5-1.21-3.fc38.x86_64
                Module libcom_err.so.2 from rpm e2fsprogs-1.46.5-4.fc38.x86_64
                Module libk5crypto.so.3 from rpm krb5-1.21-3.fc38.x86_64
                Module libkrb5.so.3 from rpm krb5-1.21-3.fc38.x86_64
                Module libgssapi_krb5.so.2 from rpm krb5-1.21-3.fc38.x86_64
                Module libpgm-5.2.so.0 from rpm openpgm-5.2.122-31.fc38.x86_64
                Module libunwind.so.8 from rpm libunwind-1.6.2-7.fc38.x86_64
                Module libicuuc.so.72 from rpm icu-72.1-2.fc38.x86_64
                Module libicui18n.so.72 from rpm icu-72.1-2.fc38.x86_64
                Module libicudata.so.72 from rpm icu-72.1-2.fc38.x86_64
                Module libzmq.so.5 from rpm zeromq-4.3.4-5.fc38.x86_64
                Module libfmt.so.9 from rpm fmt-9.1.0-2.fc38.x86_64
                Module libFairLogger.so.1.11 from rpm fairlogger-1.11.0-1.fc38.x86_64
                Module libboost_regex.so.1.78.0 from rpm boost-1.78.0-14.fc38.x86_64
                Module libboost_atomic.so.1.78.0 from rpm boost-1.78.0-14.fc38.x86_64
                Module libboost_filesystem.so.1.78.0 from rpm boost-1.78.0-14.fc38.x86_64
                Module libboost_program_options.so.1.78.0 from rpm boost-1.78.0-14.fc38.x86_64
                Module libboost_container.so.1.78.0 from rpm boost-1.78.0-14.fc38.x86_64
                Stack trace of thread 508760:
                #0  0x00007f30949635c5 n/a (/home/dklein/projects/FairMQ/build/fairmq/libfairmq.so.1.8.0 + 0x5635c5)
                #1  0x000000000000000c n/a (n/a + 0x0)
                #2  0x00000000355f7265 n/a (n/a + 0x0)
                ELF object binary architecture: AMD x86-64

GNU gdb (GDB) Fedora Linux 13.2-6.fc38
Copyright (C) 2023 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.
Type "show copying" and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
<https://www.gnu.org/software/gdb/bugs/>.
Find the GDB manual and other documentation resources online at:
    <http://www.gnu.org/software/gdb/documentation/>.

For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from /home/dklein/projects/FairMQ/build/test/testsuite_PluginServices...
[New LWP 508760]
[New LWP 508756]
[New LWP 508758]
[New LWP 508757]
[New LWP 508754]
[New LWP 508759]
[New LWP 508755]

warning: Section `.reg-xstate/508760' in core file too small.
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Core was generated by `build/test/testsuite_PluginServices --gtest_filter=PluginServices.SubscriptionT'.
Program terminated with signal SIGSEGV, Segmentation fault.

warning: Section `.reg-xstate/508760' in core file too small.
#0  std::__detail::_Hash_code_base<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection>, std::__detail::_Select1st, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, true>::_M_bucket_index (__bkt_count=13, __n=..., this=0x93b108)
    at /usr/include/c++/13/bits/hashtable_policy.h:1359
1359          _M_bucket_index(const _Hash_node_value<_Value, true>& __n,
[Current thread is 1 (Thread 0x7f308e7fa6c0 (LWP 508760))]
Missing separate debuginfos, use: dnf debuginfo-install fairlogger-1.11.0-1.fc38.x86_64
(gdb) bt
#0  std::__detail::_Hash_code_base<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection>, std::__detail::_Select1st, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, true>::_M_bucket_index (__bkt_count=13, __n=..., this=0x93b108)
    at /usr/include/c++/13/bits/hashtable_policy.h:1359
#1  std::_Hashtable<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection>, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection> >, std::__detail::_Select1st, std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<true, false, true> >::_M_bucket_index (__n=..., this=0x93b108) at /usr/include/c++/13/bits/hashtable.h:793
#2  std::_Hashtable<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection>, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection> >, std::__detail::_Select1st, std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<true, false, true> >::_M_erase
    (this=this@entry=0x93b108, __k="subscriber_5") at /usr/include/c++/13/bits/hashtable.h:2365
#3  0x00007f309495fb81 in std::_Hashtable<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection>, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection> >, std::__detail::_Select1st, std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<true, false, true> >::erase (__k="subscriber_5", this=0x93b108) at /usr/include/c++/13/bits/hashtable.h:983
#4  std::unordered_map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, boost::signals2::connection, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, boost::signals2::connection> > >::erase (__x="subscriber_5",
    this=0x93b108) at /usr/include/c++/13/bits/unordered_map.h:770
#5  fair::mq::StateMachine::UnsubscribeFromStateChange (this=<optimized out>, key="subscriber_5")
    at /home/dklein/projects/FairMQ/fairmq/StateMachine.cxx:327
#6  0x000000000047b3c1 in fair::mq::Device::UnsubscribeFromStateChange (key="subscriber_5", this=<optimized out>)
    at /home/dklein/projects/FairMQ/fairmq/Device.h:569
#7  fair::mq::PluginServices::UnsubscribeFromDeviceStateChange (subscriber="subscriber_5", this=<optimized out>)
    at /home/dklein/projects/FairMQ/fairmq/PluginServices.h:139
#8  operator() (__closure=0x93d0c8) at /home/dklein/projects/FairMQ/test/plugin_services/_control.cxx:163
#9  0x000000000047b46d in std::__invoke_impl<void, (anonymous namespace)::PluginServices_SubscriptionThreadSafety_Test::TestBody()::<lambda()> > (__f=...) at /usr/include/c++/13/bits/invoke.h:60
#10 std::__invoke<(anonymous namespace)::PluginServices_SubscriptionThreadSafety_Test::TestBody()::<lambda()> > (__fn=...)
    at /usr/include/c++/13/bits/invoke.h:96
#11 std::thread::_Invoker<std::tuple<(anonymous namespace)::PluginServices_SubscriptionThreadSafety_Test::TestBody()::<lambda()> > >::_M_invoke<0> (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:292
#12 std::thread::_Invoker<std::tuple<(anonymous namespace)::PluginServices_SubscriptionThreadSafety_Test::TestBody()::<lambda()> > >::operator() (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:299
#13 std::thread::_State_impl<std::thread::_Invoker<std::tuple<(anonymous namespace)::PluginServices_SubscriptionThreadSafety_Test::TestBody()::<lambda()> > > >::_M_run(void) (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:244
#14 0x00007f30940e31b3 in std::execute_native_thread_routine (__p=0x93d0c0)
    at ../../../../../libstdc++-v3/src/c++11/thread.cc:104
#15 0x00007f3093dcd947 in start_thread (arg=<optimized out>) at pthread_create.c:444
#16 0x00007f3093e53860 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81