dallison / subspace

Subspace IPC
Apache License 2.0
90 stars 6 forks source link

Race when resizing channel across separate clients #20

Open a7g4 opened 8 months ago

a7g4 commented 8 months ago

I've been chasing down a weird crash and I think it's a race when a channel gets resized. The symptoms of the crash are an error message like this (sometimes other values) followed by termination:

Invalid buffer index for slot 0: 1

I've since managed to isolate it to channel resizing (I think). This is a test that reproduces the issue about 1 in 10 runs (varies, on one of my machines its 1 in 2; on another its closer to 1 in 20):

TEST_F(ClientTest, TestCrash) {
  std::string channel_name = "CrashChannel";
  subspace::Client client1;
  subspace::Client client2;
  ASSERT_TRUE(client1.Init(Socket()).ok());
  ASSERT_TRUE(client2.Init(Socket()).ok());

  std::atomic<bool> publisher_finished = false;

  auto t1 = std::thread([&]() {
    auto client1_pub = *client1.CreatePublisher(channel_name, 1, 4);
    for (int i = 1; i < 32; i++) {
      std::size_t size = std::pow(2, i);
      auto buffer = client1_pub.GetMessageBuffer(size);
      std::memset(*buffer, i, size);
      client1_pub.PublishMessage(size);
    }
    publisher_finished = true;
  });
  auto t2 = std::thread([&]() {
    auto client2_sub = *client2.CreateSubscriber(channel_name);
    while (publisher_finished == false) {
      auto message = *client2_sub.ReadMessage();
      size_t size = message.length;
      if (size == 0) {
        continue;
      } else {
        std::cout << size << std::endl;
      }
    }
  });

  t1.join();
  t2.join();
}

This is the output and the stack trace (macos-aarch64 compiled with clang; running under lldb) running on one machine (this machine crashes about 1 in every 20 runs):

[ RUN      ] ClientTest.TestCrash
2
4096
8192
16384
32768
Process 49232 stopped
* thread #4, stop reason = EXC_BAD_ACCESS (code=1, address=0x60)
    frame #0: 0x00000001000c8c70 client_test`subspace::Channel::NextSlot(subspace::MessageSlot*, bool, int, std::__1::function<bool (subspace::ChannelLock*)>) + 536
client_test`subspace::Channel::NextSlot:
->  0x1000c8c70 <+536>: ldr    x8, [x9, #0x18]
    0x1000c8c74 <+540>: orr    x8, x8, #0x4
    0x1000c8c78 <+544>: str    x8, [x9, #0x18]
    0x1000c8c7c <+548>: ldr    x8, [sp, #0x68]
Target 0: (client_test) stopped.
(lldb) bt
* thread #4, stop reason = EXC_BAD_ACCESS (code=1, address=0x60)
  * frame #0: 0x00000001000c8c70 client_test`subspace::Channel::NextSlot(subspace::MessageSlot*, bool, int, std::__1::function<bool (subspace::ChannelLock*)>) + 536
    frame #1: 0x00000001000a8d4c client_test`subspace::details::SubscriberImpl::NextSlot(std::__1::function<bool (subspace::ChannelLock*)>) + 132
    frame #2: 0x00000001000a897c client_test`subspace::Client::ReadMessageInternal(subspace::details::SubscriberImpl*, subspace::ReadMode, bool, bool) + 332
    frame #3: 0x00000001000a92a8 client_test`subspace::Client::ReadMessage(subspace::details::SubscriberImpl*, subspace::ReadMode) + 408
    frame #4: 0x0000000100011d54 client_test`subspace::Subscriber::ReadMessage(subspace::ReadMode) + 48
    frame #5: 0x000000010006237c client_test`ClientTest_TestCrash_Test::TestBody()::$_7::operator()() const + 192
    frame #6: 0x0000000100062288 client_test`decltype(std::declval<ClientTest_TestCrash_Test::TestBody()::$_7>()()) std::__1::__invoke[abi:v160006]<ClientTest_TestCrash_Test::TestBody()::$_7>(ClientTest_TestCrash_Test::TestBody()::$_7&&) + 24
    frame #7: 0x0000000100062264 client_test`void std::__1::__thread_execute[abi:v160006]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, ClientTest_TestCrash_Test::TestBody()::$_7>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, ClientTest_TestCrash_Test::TestBody()::$_7>&, std::__1::__tuple_indices<>) + 28
    frame #8: 0x0000000100061f6c client_test`void* std::__1::__thread_proxy[abi:v160006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, ClientTest_TestCrash_Test::TestBody()::$_7>>(void*) + 84
    frame #9: 0x000000018f32d034 libsystem_pthread.dylib`_pthread_start + 136

On a different machine (linux-aarch64) compiled with gcc running under gdb (this machine triggers this crash every other run):

[ RUN      ] ClientTest.TestCrash
[New Thread 0xfffff4f0d040 (LWP 84416)]
[New Thread 0xffffeffff040 (LWP 84417)]
Invalid buffer index for slot 0: 1

Thread 4 "client_test" received signal SIGABRT, Aborted.
[Switching to Thread 0xffffeffff040 (LWP 84417)]
__pthread_kill_implementation (threadid=281474708271168, signo=signo@entry=6, no_tid=no_tid@entry=0) at ./nptl/pthread_kill.c:44
44      ./nptl/pthread_kill.c: No such file or directory.
(gdb) bt
#0  __pthread_kill_implementation (threadid=281474708271168, signo=signo@entry=6, no_tid=no_tid@entry=0) at ./nptl/pthread_kill.c:44
#1  0x0000fffff5c0f254 in __pthread_kill_internal (signo=6, threadid=<optimized out>) at ./nptl/pthread_kill.c:78
#2  0x0000fffff5bca67c in __GI_raise (sig=sig@entry=6) at ../sysdeps/posix/raise.c:26
#3  0x0000fffff5bb7130 in __GI_abort () at ./stdlib/abort.c:79
#4  0x0000fffff7ea1980 in subspace::Channel::Buffer(int) const ()
   from /home/eq/.cache/bazel/_bazel_eq/48758b0bb01c6b4a67c45fe1327b09b0/execroot/_main/bazel-out/aarch64-fastbuild/bin/client/../_solib_aarch64/libclient_Slibsubspace_Uclient.so
#5  0x0000fffff7ea1768 in subspace::Channel::Prefix(subspace::MessageSlot*) const ()
   from /home/eq/.cache/bazel/_bazel_eq/48758b0bb01c6b4a67c45fe1327b09b0/execroot/_main/bazel-out/aarch64-fastbuild/bin/client/../_solib_aarch64/libclient_Slibsubspace_Uclient.so
#6  0x0000fffff7e142f4 in subspace::Channel::NextSlot(subspace::MessageSlot*, bool, int, std::function<bool (subspace::ChannelLock*)>) ()
   from /home/eq/.cache/bazel/_bazel_eq/48758b0bb01c6b4a67c45fe1327b09b0/execroot/_main/bazel-out/aarch64-fastbuild/bin/client/../_solib_aarch64/libcommon_Slibsubspace_Ucommon.so
#7  0x0000fffff7ea40d0 in subspace::details::SubscriberImpl::NextSlot(std::function<bool (subspace::ChannelLock*)>) ()
   from /home/eq/.cache/bazel/_bazel_eq/48758b0bb01c6b4a67c45fe1327b09b0/execroot/_main/bazel-out/aarch64-fastbuild/bin/client/../_solib_aarch64/libclient_Slibsubspace_Uclient.so
#8  0x0000fffff7e9b5a0 in subspace::Client::ReadMessageInternal(subspace::details::SubscriberImpl*, subspace::ReadMode, bool, bool) ()
   from /home/eq/.cache/bazel/_bazel_eq/48758b0bb01c6b4a67c45fe1327b09b0/execroot/_main/bazel-out/aarch64-fastbuild/bin/client/../_solib_aarch64/libclient_Slibsubspace_Uclient.so
#9  0x0000fffff7e9b988 in subspace::Client::ReadMessage(subspace::details::SubscriberImpl*, subspace::ReadMode) ()
   from /home/eq/.cache/bazel/_bazel_eq/48758b0bb01c6b4a67c45fe1327b09b0/execroot/_main/bazel-out/aarch64-fastbuild/bin/client/../_solib_aarch64/libclient_Slibsubspace_Uclient.so
#10 0x0000aaaaaaae9248 in subspace::Subscriber::ReadMessage(subspace::ReadMode) ()
#11 0x0000aaaaaaae0ae0 in ClientTest_TestCrash_Test::TestBody()::{lambda()#2}::operator()() const ()
#12 0x0000aaaaaaae70b4 in void std::__invoke_impl<void, ClientTest_TestCrash_Test::TestBody()::{lambda()#2}>(std::__invoke_other, ClientTest_TestCrash_Test::TestBody()::{lambda()#2}&&) ()
#13 0x0000aaaaaaae7004 in std::__invoke_result<ClientTest_TestCrash_Test::TestBody()::{lambda()#2}>::type std::__invoke<ClientTest_TestCrash_Test::TestBody()::{lambda()#2}>(ClientTest_TestCrash_Test::TestBody()::{lambda()#2}&&) ()
#14 0x0000aaaaaaae6f74 in void std::thread::_Invoker<std::tuple<ClientTest_TestCrash_Test::TestBody()::{lambda()#2}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) ()
#15 0x0000aaaaaaae6f28 in std::thread::_Invoker<std::tuple<ClientTest_TestCrash_Test::TestBody()::{lambda()#2}> >::operator()() ()
#16 0x0000aaaaaaae6ee4 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<ClientTest_TestCrash_Test::TestBody()::{lambda()#2}> > >::_M_run() ()
#17 0x0000fffff5e431fc in ?? () from /lib/aarch64-linux-gnu/libstdc++.so.6
#18 0x0000fffff5c0d5c8 in start_thread (arg=0x0) at ./nptl/pthread_create.c:442
#19 0x0000fffff5c75d9c in thread_start () at ../sysdeps/unix/sysv/linux/aarch64/clone.S:79
mikael-s-persson commented 8 months ago

Friendly ping @dallison, any opinion on what might be causing this?

mikael-s-persson commented 8 months ago

@a7g4 I dug into this a bit, but I'm not able to reproduce the crashes in any config I try (gcc / clang, libstdc++ / libc++, opt / dbg).

But looking at the code and at Dave's fix to the 'reload' function on monday, see https://github.com/dallison/subspace/commit/8a4bdeb6912288aa4613c506ba70ba1dcebebf4c#diff-0456d178d4819d469423f58832be3dd34231c1325bb7ea678813e48cca5f1b11L99

I suspect this bug is fixed, because that reload bug would definitely cause the sort of crash you reported, i.e., the bufferindex being out-of-sync with the buffers vector, since the reload is the way a subscriber checks if updating the buffers_ is necessary before proceeding with looking at the message slots.

Like I said, I cannot check that, I cannot reproduce your error.

dallison commented 8 months ago

I've found the issue. Accessing an argument after it has been std::moved. I have a fix.

dallison commented 8 months ago

Sorry for the delay, for some reason I am not getting notified of some issues in github.

a7g4 commented 8 months ago

No worries! Let us know how we can help 🙂

a7g4 commented 8 months ago

I've found the issue. Accessing an argument after it has been std::moved. I have a fix.

Is that this commit? https://github.com/dallison/subspace/commit/8a4bdeb6912288aa4613c506ba70ba1dcebebf4c

If so, it doesn't seem to resolve the issue 😢

We did notice that this issue seems pretty common on aarch64 but not on x86_64. Does that help identify/diagnose or is there any extra data that we can gather?

mikael-s-persson commented 8 months ago

I did a bit of debugging on our machine. Here is what I've been able to gather so far from running that test in gdb.

Error:

Thread 4 "client_test" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7fb4ae0110 (LWP 13282)]
subspace::Channel::NextSlot(subspace::MessageSlot*, bool, int, std::function<bool (subspace::ChannelLock*)>) (this=0x7fac001058, slot=0x7fb42d32a8, reliable=false, owner=1, reload=...)
    at common/channel.cc:683
683   Prefix(slot)->flags |= kMessageSeen;

Stack trace on all relevant threads:

Thread 4 (Thread 0x7fb4ae0110 (LWP 13282)):
#0  subspace::Channel::NextSlot(subspace::MessageSlot*, bool, int, std::function<bool (subspace::ChannelLock*)>) (this=0x7fac001058, slot=0x7fb42d32a8, reliable=false, owner=1, reload=...)
    at common/channel.cc:683
#1  0x0000007fb7ec35fc in subspace::details::SubscriberImpl::NextSlot(std::function<bool (subspace::ChannelLock*)>) (this=0x7fac001050, reload=...) at ./client/client_channel.h:246
#2  0x0000007fb7ebb550 in subspace::Client::ReadMessageInternal (this=0x7fffffd138, subscriber=0x7fac001050, mode=subspace::ReadMode::kReadNext, pass_activation=false, clear_trigger=true)
    at client/client.cc:454
#3  0x0000007fb7ebb93c in subspace::Client::ReadMessage (this=0x7fffffd138, subscriber=0x7fac001050, mode=subspace::ReadMode::kReadNext) at client/client.cc:535
#4  0x00000055555a0720 in subspace::Subscriber::ReadMessage (this=0x7fb4adf7b0, mode=subspace::ReadMode::kReadNext) at ./client/client.h:707
#5  0x0000005555599408 in ClientTest_TestCrash_Test::<lambda()>::operator()(void) const (__closure=0x55556112f8) at client/client_test.cc:1268

Thread 3 (Thread 0x7fb572d110 (LWP 13281)):
#0  0x0000007fb632f0b4 in __libc_recv (fd=10, buf=0x7fb572c460, len=4, flags=0) at ../sysdeps/unix/sysv/linux/recv.c:28
#1  0x0000007fb6d9261c in toolbelt::ReceiveFully (c=0x0, fd=10, length=4, buffer=0x7fb572c460 "\220\304r\265\177", buflen=4) at external/toolbelt/toolbelt/sockets.cc:90
#2  0x0000007fb6d92c88 in toolbelt::Socket::ReceiveMessage (this=0x7fffffc098, buffer=0x7fffffc0b0 "", buflen=4096, c=0x0) at external/toolbelt/toolbelt/sockets.cc:200
#3  0x0000007fb7ebdb1c in subspace::Client::SendRequestReceiveResponse (this=0x7fffffc078, req=..., response=..., fds=std::vector of length 0, capacity 100) at client/client.cc:953
#4  0x0000007fb7ebd7e4 in subspace::Client::ResizeChannel (this=0x7fffffc078, publisher=0x7fa8001050, new_slot_size=32) at client/client.cc:914
#5  0x0000007fb7eba774 in subspace::Client::GetMessageBuffer (this=0x7fffffc078, publisher=0x7fa8001050, max_size=32) at client/client.cc:261
#6  0x00000055555a0234 in subspace::Publisher::GetMessageBuffer (this=0x7fb572c7c8, max_size=32) at ./client/client.h:589
#7  0x000000555559926c in ClientTest_TestCrash_Test::<lambda()>::operator()(void) const (__closure=0x55556112c8) at client/client_test.cc:1259

Thread 2 (Thread 0x7fb5f36110 (LWP 13280)):
#0  0x0000007fb6327550 in __pthread_mutex_lock_full (mutex=0x7fb42dc080) at pthread_mutex_lock.c:313
#1  0x0000007fb7ec0b0c in subspace::ChannelLock::Lock (this=0x7fb002b3f0) at ./common/channel.h:111
#2  0x0000007fb7e45ad4 in subspace::ChannelLock::ChannelLock(pthread_mutex_t*, std::function<bool (subspace::ChannelLock*)>) (this=0x7fb002b3f0, lock=0x7fb42dc080, reload=...)
    at ./common/channel.h:98
#3  0x0000007fb7e434d4 in subspace::Channel::ExtendBuffers (this=0x7fb004e8d0, new_slot_size=32) at common/channel.cc:382
#4  0x0000007fb7f931c4 in subspace::ClientHandler::HandleResize (this=0x7fb0020240, req=..., response=0x7fb004d920, fds=std::vector of length 0, capacity 0) at server/client_handler.cc:395
#5  0x0000007fb7f914f8 in subspace::ClientHandler::HandleMessage (this=0x7fb0020240, req=..., resp=..., fds=std::vector of length 0, capacity 0) at server/client_handler.cc:87
#6  0x0000007fb7f91000 in subspace::ClientHandler::Run (this=0x7fb0020240, c=0x7fb00212a0) at server/client_handler.cc:29
#7  0x0000007fb7f72d04 in subspace::Server::<lambda(co::Coroutine*)>::operator()(co::Coroutine *) const (__closure=0x7fb00212b0, c=0x7fb00212a0) at server/server.cc:278

Some stack variables printed out from gdb:

(gdb) print slot->id
$2 = 3
(gdb) print slot->message_size
$3 = 16
(gdb) print slot->buffer_index
$4 = 4
(gdb) print ccb_->num_buffers
$11 = 5
(gdb) print buffers_.size()
$7 = 5
(gdb) print buffers_[0].buffer
$12 = 0x0
(gdb) print buffers_[1].buffer
$13 = 0x7fb42d7000 ""
(gdb) print buffers_[2].buffer
$14 = 0x7fb42cf000 "\001"
(gdb) print buffers_[3].buffer
$15 = 0x7fb42cd000 "\001"
(gdb) print buffers_[4].buffer
$16 = 0x0
(gdb) print buffers_[0].slot_size
$17 = 0
(gdb) print buffers_[1].slot_size
$18 = 2
(gdb) print buffers_[2].slot_size
$19 = 4
(gdb) print buffers_[3].slot_size
$20 = 8
(gdb) print buffers_[4].slot_size
$21 = 0

Observations:

Currently, I'm suspicious of client_handler.cc:404 which calls channel->AddBuffer without a ChannelLock, and without the memory fences inherent to x86, this might lead to inconsistent writes.

mikael-s-persson commented 8 months ago

I found the bug.

It was a race condition between the publisher's increment of the ref count on the new (resized) slot (at https://github.com/dallison/subspace/blob/main/client/client.cc#L264) via the SetSlotToBiggestBuffer call after the reply from the server to resize the channel, and the subscriber unmapping unused buffers (see https://github.com/dallison/subspace/blob/main/common/channel.cc#L432C15-L432C33) via the UnmapUnusedBuffers function. Basically, the subscriber might come around to clean up unused buffers right in between the server having resized channel but the publisher having not yet claimed the new slot. Kaboom.

I can fix it pretty easily by never cleaning the last buffer. (i.e., always assume that the largest buffer is in use, even if it's ref count is zero). I'll make a PR soon.

mikael-s-persson commented 8 months ago

I guess in theory, there is still a race condition because if multiple publishers are resizing at the same time, there could be multiple buffers on the tail end of the channel that don't yet have a refs count. Maybe the proper solution is for the server to set to ref count to 1 when extending the buffers, instead of having the publishers increment it after the reply.