I've been trying to integrate asio into my experimental coroutine actor library (which is supposed to make it easy to write thread-safe code with coroutines without using locks, even internally), and needed to pass cancellation from actors (which currently use stop_tokens) to asio, which made me go down the rabbit hole of discovering how cancellation in asio works. Since asio has operators && and || for awaitables I assumed they are thread-safe, and indeed from reading the code parallel_group uses atomics to synchronize, as if expecting handlers to run in parallel, only emitting cancellation signal once, etc. This shows to me that parallel_group is designed and intended to be thread-safe, why complicate it with atomics otherwise?
However, per-object cancellation docs say you must be careful when emitting cancellation to composed operations, but examples on the topic of asio awaitables (in youtube videos, etc.) like to show asio::async_write, which is actually a composed operation. Looking at how async_write is implemented I found it uses cancellation state (like many other places in asio) which is not thread-safe, internal handlers are calling cancelled(), and give away cancellation state slot when starting another sub-operation, which would be a race when this handler runs in parallel with someone else running emit. Clearly you cannot do that and must synchronize in some way, e.g. with a strand.
I've been searching that maybe parallel group or something else in asio actually synchronizes cancellation on awaits of composite operations, maybe there's a hidden strand somewhere, but couldn't. For integration purposes this seems ok, I could wrap asio operations into some kind of a strand executor, and be done with that. But it felt wrong. Why would parallel group use atomics, and then just call emit on operations, when it is unsafe to do for composite operations? And worse, all awaitables are compisite operations (they have cancellation state internally), so any use of && or || must be single-threaded, or use a strand for themselves and all downstream operations, is it really practical though? This feels even more wrong, and I wonder if people really always use strands on awaitables in practice.
Failing to find some hidden associated executor that synchronizes composed operations with cancellation, I decided to write a test for this. And sure enough, here's a minimal example that shows the cancellation data race when compiled with -fsanitize=thread:
test_cancel.cpp
```c++
#include
#include
#include
#include
#include
using asio::awaitable;
using asio::use_awaitable;
using asio::co_spawn;
using asio::detached;
using namespace asio::experimental::awaitable_operators;
awaitable return_immediately() {
printf("returning value\n");
co_return 42;
}
awaitable return_after_long_sleep() {
// we want to run in another thread when cancelled
printf("rescheduling...\n");
co_await asio::post(co_await asio::this_coro::executor, use_awaitable);
// use sleep because it's not a synchronization
printf("sleeping...\n");
usleep(1000000);
// this co_await checks for cancelled() and triggers the race
printf("calling return_immediately\n");
co_return co_await return_immediately();
}
awaitable return_after_short_sleep() {
// we want to block current thread to make sure another coroutine
// starts sleeping in a different thread, not executed immediately
// after we return.
usleep(500000);
co_return co_await return_immediately();
}
awaitable composed() {
printf("composed starting\n");
co_await (
return_after_long_sleep() ||
return_after_short_sleep()
);
printf("composed finished\n");
}
int main() {
asio::thread_pool pool(2);
printf("starting...\n");
co_spawn(pool, composed(), detached);
pool.join();
return 0;
}
```
I've been trying to integrate asio into my experimental coroutine actor library (which is supposed to make it easy to write thread-safe code with coroutines without using locks, even internally), and needed to pass cancellation from actors (which currently use
stop_token
s) to asio, which made me go down the rabbit hole of discovering how cancellation in asio works. Since asio has operators&&
and||
for awaitables I assumed they are thread-safe, and indeed from reading the codeparallel_group
uses atomics to synchronize, as if expecting handlers to run in parallel, only emitting cancellation signal once, etc. This shows to me thatparallel_group
is designed and intended to be thread-safe, why complicate it with atomics otherwise?However, per-object cancellation docs say you must be careful when emitting cancellation to composed operations, but examples on the topic of asio awaitables (in youtube videos, etc.) like to show
asio::async_write
, which is actually a composed operation. Looking at howasync_write
is implemented I found it uses cancellation state (like many other places in asio) which is not thread-safe, internal handlers are callingcancelled()
, and give away cancellation state slot when starting another sub-operation, which would be a race when this handler runs in parallel with someone else running emit. Clearly you cannot do that and must synchronize in some way, e.g. with a strand.I've been searching that maybe parallel group or something else in asio actually synchronizes cancellation on awaits of composite operations, maybe there's a hidden strand somewhere, but couldn't. For integration purposes this seems ok, I could wrap asio operations into some kind of a strand executor, and be done with that. But it felt wrong. Why would parallel group use atomics, and then just call emit on operations, when it is unsafe to do for composite operations? And worse, all awaitables are compisite operations (they have cancellation state internally), so any use of
&&
or||
must be single-threaded, or use a strand for themselves and all downstream operations, is it really practical though? This feels even more wrong, and I wonder if people really always use strands on awaitables in practice.Failing to find some hidden associated executor that synchronizes composed operations with cancellation, I decided to write a test for this. And sure enough, here's a minimal example that shows the cancellation data race when compiled with
-fsanitize=thread
:test_cancel.cpp
```c++ #include
#include
#include
#include
#include
using asio::awaitable;
using asio::use_awaitable;
using asio::co_spawn;
using asio::detached;
using namespace asio::experimental::awaitable_operators;
awaitable return_immediately() {
printf("returning value\n");
co_return 42;
}
awaitable return_after_long_sleep() {
// we want to run in another thread when cancelled
printf("rescheduling...\n");
co_await asio::post(co_await asio::this_coro::executor, use_awaitable);
// use sleep because it's not a synchronization
printf("sleeping...\n");
usleep(1000000);
// this co_await checks for cancelled() and triggers the race
printf("calling return_immediately\n");
co_return co_await return_immediately();
}
awaitable return_after_short_sleep() {
// we want to block current thread to make sure another coroutine
// starts sleeping in a different thread, not executed immediately
// after we return.
usleep(500000);
co_return co_await return_immediately();
}
awaitable composed() {
printf("composed starting\n");
co_await (
return_after_long_sleep() ||
return_after_short_sleep()
);
printf("composed finished\n");
}
int main() {
asio::thread_pool pool(2);
printf("starting...\n");
co_spawn(pool, composed(), detached);
pool.join();
return 0;
}
```
Result when running on macbook air m1
``` % ./a.out a.out(79777,0x1ffc62080) malloc: nano zone abandoned due to inability to reserve vm space. starting... composed starting rescheduling... sleeping... returning value calling return_immediately ================== WARNING: ThreadSanitizer: data race (pid=79777) Read of size 4 at 0x000106601720 by thread T2: #0 auto asio::detail::awaitable_frame_base::await_transform(asio::awaitable) const awaitable.hpp:171 (a.out:arm64+0x100005004) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#1 return_after_long_sleep() (.resume) test_cancel.cpp:28 (a.out:arm64+0x10001be5c) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#2 asio::detail::awaitable_thread::pump() awaitable.hpp:768 (a.out:arm64+0x100007674) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#3 void asio::detail::executor_function::complete>, std::__1::allocator>(asio::detail::executor_function::impl_base*, bool) executor_function.hpp:115 (a.out:arm64+0x100007b3c) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#4 void asio::thread_pool::basic_executor_type, 8u>::do_execute(asio::detail::executor_function&&, std::__1::integral_constant) const thread_pool.hpp:121 (a.out:arm64+0x100012ce0) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
...
Previous write of size 4 at 0x000106601720 by thread T1:
#0 asio::detail::cancellation_handler, asio::cancellation_filter<(asio::cancellation_type)1>>>::call(asio::cancellation_type) cancellation_signal.hpp:75 (a.out:arm64+0x10000f278) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#1 asio::detail::cancellation_handler, std::exception_ptr, int, std::exception_ptr, int), asio::any_io_executor>, asio::deferred_async_operation, asio::detail::awaitable_as_function>, asio::deferred_async_operation, asio::detail::awaitable_as_function>>, asio::any_io_executor, void>>::call(asio::cancellation_type) cancellation_signal.hpp:75 (a.out:arm64+0x100016c9c) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#2 asio::detail::binder0 asio::detail::co_spawn_entry_point, std::exception_ptr, int, std::exception_ptr, int), asio::any_io_executor>, asio::deferred_async_operation, asio::detail::awaitable_as_function>, asio::deferred_async_operation, asio::detail::awaitable_as_function>>, asio::any_io_executor, asio::detail::awaitable_as_function>(asio::awaitable*, asio::detail::co_spawn_state, std::exception_ptr, int, std::exception_ptr, int), asio::any_io_executor>, asio::deferred_async_operation, asio::detail::awaitable_as_function>, asio::deferred_async_operation, asio::detail::awaitable_as_function>>, asio::any_io_executor, asio::detail::awaitable_as_function, void>)::'lambda'()>::operator()() bind_handler.hpp:59 (a.out:arm64+0x10001a628) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#3 void asio::detail::executor_function::complete asio::detail::co_spawn_entry_point, std::exception_ptr, int, std::exception_ptr, int), asio::any_io_executor>, asio::deferred_async_operation, asio::detail::awaitable_as_function>, asio::deferred_async_operation, asio::detail::awaitable_as_function>>, asio::any_io_executor, asio::detail::awaitable_as_function>(asio::awaitable*, asio::detail::co_spawn_state, std::exception_ptr, int, std::exception_ptr, int), asio::any_io_executor>, asio::deferred_async_operation, asio::detail::awaitable_as_function>, asio::deferred_async_operation, asio::detail::awaitable_as_function>>, asio::any_io_executor, asio::detail::awaitable_as_function, void>)::'lambda'()>, std::__1::allocator>(asio::detail::executor_function::impl_base*, bool) executor_function.hpp:115 (a.out:arm64+0x10001a7c8) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#4 void asio::thread_pool::basic_executor_type, 8u>::do_execute(asio::detail::executor_function&&, std::__1::integral_constant) const thread_pool.hpp:121 (a.out:arm64+0x100012ce0) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
...
As if synchronized via sleep:
#0 usleep :83895840 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x2e34c) (BuildId: 981013a59ee23029b2ed90b76951327532000000200000000100000000000b00)
#1 return_after_long_sleep() (.resume) test_cancel.cpp:25 (a.out:arm64+0x10001ba2c) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
#2 asio::detail::awaitable_thread::pump() awaitable.hpp:768 (a.out:arm64+0x100007674) (BuildId: 7b686d5920b23716a9eaa3d4903b800d32000000200000000100000000000d00)
...
SUMMARY: ThreadSanitizer: data race awaitable.hpp:171 in auto asio::detail::awaitable_frame_base::await_transform(asio::awaitable) const
==================
composed finished
ThreadSanitizer: reported 1 warnings
```