Open pfeatherstone opened 1 year ago
This description is a bit vague. What kind of thread pool are you using? Why not go for asio::thraed_pool
? What's the concurrency_hint you pass to the io_context?
I'm doing the following:
asio::io_context ioc;
...
std::vector<std::thread> pool;
for (size_t i = 0 ; i < (nthreads - 1) ; ++i)
pool.emplace_back([&]{ioc.run();});
ioc.run();
for (auto& t : pool)
t.join();
I didn't know there was a concurrency hint. I'll try that.
Or just use asio::thread_pool
?
What's the benefit of using that?
I don't mind using that object other than I have to change all my strand types, but if it's the case that it's always better to use that than your own thread pool then that should be clear in the docs.
The concurrency hint isn't clear either. I'll test tomorrow with asio's thread pool.
#include <thread>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
using namespace std;
using namespace std::chrono;
using namespace std::chrono_literals;
namespace asio = boost::asio;
namespace beast = boost::beast;
struct device
{
enum state_t { posting, reading, complete };
asio::strand<asio::io_context::executor_type> executor_device;
// Some synchronous device
device(asio::io_context& ctx) : executor_device(ctx.get_executor()) {}
template <
class CompletionToken
>
auto async_read(
CompletionToken&& token
)
{
return asio::async_compose<CompletionToken, void(boost::system::error_code)> (
[this, state = posting](auto& self, boost::system::error_code ec = {}) mutable
{
if (ec)
state = complete;
switch (state)
{
case posting:
state = reading;
asio::post(executor_device, std::move(self));
break;
case reading:
state = complete;
// Do some synchronous operation on some device
cout << "Reading thread id " << std::this_thread::get_id() << endl;
case complete:
self.complete(ec);
break;
}
},
token, executor_device
);
}
};
struct processing_state
{
asio::strand<asio::io_context::executor_type> executor_proc;
device dev;
processing_state(
asio::io_context& ctx
) : executor_proc(ctx.get_executor()),
dev{ctx}
{
}
void start()
{
next_read();
}
void next_read()
{
dev.async_read(asio::bind_executor(executor_proc, beast::bind_front_handler(&processing_state::loop, this)));
}
void loop(boost::system::error_code ec)
{
if (ec)
{
cout << "Error : " << ec.message() << endl;
return;
}
// Launch next async op and come back on this executor
next_read();
// Do some work on read stuff
cout << "Processing thread id " << std::this_thread::get_id() << endl;
// Wait a bit
std::this_thread::sleep_for(2s);
}
};
int main()
{
boost::asio::io_context ctx;
processing_state state(ctx);
state.start();
std::thread t{[&] {ctx.run();}};
ctx.run();
t.join();
}
@klemens-morgenstern This is an example of what I'm talking about. It simulates an async wrapper around a synchronous device. A synchronous device is paired with a strand. An async operation on the device first posts the handler onto the device's strand, then does a read (except here it doesn't), then completes the op. In this example, you complete the op back onto a processing strand. I've added some print statements to show the thread IDs. I have two threads running, main thread and custom thread. I would hope that asio would pair each strand with its own thread. But it doesn't.
In my actual use-case, I'm wrapping a libuhd device. The idea is to asynchronously read some samples, then once the read op is complete, schedule the next read, then process the currently read samples. However, everything ends up happening on the same thread and I get no parallelism, and my buffers overflow. However if asio realised that there were enough threads for each strand, everything would run in parallel and i would be able to process my samples coming off the SDR in real time.
By the way, i get the same behaviour when using an asio::thread_pool
with 2 threads. All ops happen on same thread.
@klemens-morgenstern any help on this would be massively appreciated. I might have to resort back to manual multi-threading for now. Bit of a shame as I really like the asio model. Theoretically it does everything i need very simply and I don't have to do any synchronization. However, at the moment it looks like it's not parallelising anything even with a threadpool.
boost::asio::io_context ctx{2u};
^ does that do anything?
I'll check tomorrow. But trying with asio::thread_pool gives same result
I don't know if I'm misusing the API somehow or does Asio not give any guarantees on how thread pools will be used ? It's a shame if you have a multi-core machine and asio only ever uses 1 thread despite user efforts to give it more
@klemens-morgenstern In this example, i've changed it slightly so that each strand is doing some heavy stuff, i.e. while-looping for 2s. I wanted to make sure that it had nothing to do with whether or not a thread was "busy"
#include <thread>
#include <iostream>
#include <chrono>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
using namespace std;
using namespace std::chrono;
using namespace std::chrono_literals;
namespace asio = boost::asio;
namespace beast = boost::beast;
template <
class Executor = asio::any_io_executor
>
struct device
{
enum state_t { posting, reading, complete };
Executor executor_device;
template <
class CompletionToken
>
auto async_read(
CompletionToken&& token
)
{
return asio::async_compose<CompletionToken, void(boost::system::error_code)> (
[this, state = posting](auto& self, boost::system::error_code ec = {}) mutable
{
if (ec)
state = complete;
if (state == posting)
{
state = reading;
asio::post(executor_device, std::move(self));
return;
}
if (state == reading)
{
state = complete;
cout << "Reading thread id " << std::this_thread::get_id() << endl;
// Do some crazy operation
const auto start = high_resolution_clock::now();
while ((high_resolution_clock::now() - start) < 2s);
cout << "Reading complete" << endl;
}
if (state == complete)
{
self.complete(ec);
}
},
token, executor_device
);
}
};
template <
class Executor = asio::any_io_executor
>
struct processing_state
{
Executor executor_proc;
device<Executor> dev;
void start()
{
next_read();
}
void next_read()
{
dev.async_read(asio::bind_executor(executor_proc, beast::bind_front_handler(&processing_state::loop, this)));
}
void loop(boost::system::error_code ec)
{
if (ec)
{
cout << "Error : " << ec.message() << endl;
return;
}
// Launch next async op and come back on this executor
next_read();
// Do some work on read stuff
cout << "Processing thread id " << std::this_thread::get_id() << endl;
const auto start = high_resolution_clock::now();
while ((high_resolution_clock::now() - start) < 2s);
cout << "Processing complete" << endl;
}
};
int main()
{
using strand = asio::strand<asio::io_context::executor_type>;
const size_t nthreads = 2;
asio::io_context ctx(nthreads);
std::vector<std::thread> pool;
processing_state<strand> state{asio::make_strand(ctx), {asio::make_strand(ctx)}};
// processing_state<asio::io_context::executor_type> state{ctx.get_executor(), {ctx.get_executor()}};
state.start();
for (size_t i = 0 ; i < (nthreads - 1) ; ++i)
pool.emplace_back([&]{ctx.run();});
ctx.run();
for (auto& t : pool)
t.join();
}
I did what you suggested, and STILL, both strands are running on the same thread.
If however you replace
processing_state<strand> state{asio::make_strand(ctx), {asio::make_strand(ctx)}};
with
processing_state<asio::io_context::executor_type> state{ctx.get_executor(), {ctx.get_executor()}};
then you max out both threads. In this particular example, that's fine, but in general, you don't get thread safety.
I do feel like this is a massive flaw in Asio. It looks like a strand is associated with a particular thread on construction. If it just so happens that the same thread is picking up all strands all the time, then you're buggered. You get no parallelism whatsoever.
Hi @pfeatherstone,
I'm sorry if missed something, but I don't see a reason to expect different threads to be used in your example. The example uses the same instance of strand
(the one which is stored in processing_state::executor_proc
) for device::async_read
method and (it looks like - I'm new to the "recent" changes in Asio when its author started adoption of Asio for TS and I yet didn't check what beast::bind_front_handler
does) for processing_state::loop
method. Consider reading documentation for asio::bind_executor
function, like https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/bind_executor.html. When asio::post
method is called in device::async_read
method, the 2nd argument of asio::post
call (std::move(self)
) seems to have executor (instance of strand
stored in processing_state::executor_proc
) assigned, so Asio executes completion handler within instance of strand
stored in device::executor_device
and within that execution context (in that instance of strand
) executes processing_state::loop
within instance of strand
stored in processing_state::executor_proc
. Because device::async_read
method is executed within instance of strand
stored in processing_state::executor_proc
, Asio may execute completion handler posted by asio::post
within the same thread immediately after return from device::async_read
method. Under the hood strand
is implemented as a queue, so asio::post
(may) just add a handler to that queue. Once one handler executed within strand
completes, if the internal queue of strand
is not empty, it can lead to immediate execution of the next handler from that queue within the same thread.
Could you please try to modify your example to use multiple instances of processing_state
and device
(multiple pairs, say number of pairs is number of threads in the pool x 2)?
Also note (it's definitely not your case, but you need to know), that under the hood different strand
instances can use the same implementation instance, because each strand
requires a mutex (it looks like strand
can be implemented in a lock-free way, but it will be LIFO, while strand
guarantees FIFO, refer to respective discussion in mailing list) and there is no way to provide one (not shared with other instances of strand
) mutex per each instance of strand
without exhausting OS resources under heavy loading (when thousands of strand
instances are created). Refer to https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/io_context__strand.html#boost_asio.reference.io_context__strand.remarks.
Thank you.
Refer to https://github.com/mabrarov/asio-1240/compare/feature/parallel_processing, where the output on master branch is ("processing" of asynchronous operation result - Processing thread ID
- is always performed in the same thread as "reading" for the same operation - Reading thread ID
):
0: Posting thread ID: 9692
0: Reading thread ID: 9692
0: Reading completed
1: Posting thread ID: 9692
0: Processing thread ID: 9692
0: Processing completed
1: Reading thread ID: 6288
1: Reading completed
2: Posting thread ID: 6288
1: Processing thread ID: 6288
1: Processing completed
2: Reading thread ID: 9692
2: Reading completed
3: Posting thread ID: 9692
2: Processing thread ID: 9692
2: Processing completed
...
while the output on feature/parallel_processing branch is ("processing" of asynchronous operation result is sometimes performed in different thread, than "reading" for the same operation):
0: Posting thread ID: 9040
0: Reading thread ID: 9040
0: Reading completed
1: Posting thread ID: 108
0: Processing thread ID: 108
1: Reading thread ID: 9040
0: Processing completed
1: Reading completed
2: Posting thread ID: 108
2: Reading thread ID: 9040
1: Processing thread ID: 108
2: Reading completed
1: Processing completed
3: Posting thread ID: 9040
3: Reading thread ID: 108
2: Processing thread ID: 9040
2: Processing completed
...
Tested with Boost 1.81.0 and MSVC 2019 on Windows 10 x64.
So it looks like the only real change is replacing asio::bind_executor()
with a asio::defer()
.
That seems to work, i.e max both threads, but I'm not convinced it's doing everything I want i.e.
@mabrarov thank you for looking into this by the way!
@klemens-morgenstern can you look at this? Would you say it's ok to use asio::defer()
instead of asio::bind_executor()
to come back onto a particular strand?
Hi @pfeatherstone,
Let me (try to) answer your questions based on https://github.com/mabrarov/asio-1240/blob/feature/parallel_processing/main.cpp.
tie all processing to one strand
All calls of processing_state::next_read
method, except the 1st one initiated by processing_state::start
, are performed within explicit strand. The 1st and the 2nd calls of processing_state::next_read
method are performed within "implicit strand", though there is a chance that the 2nd call can return before the 1st call returns, but it's safe, because processing_state::next_read
method doesn't have logic (doesn't access members of processing_state
class) after calling device::async_read
method.
Refer to chat_session::start
and chat_session::do_read_header
methods in https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/example/cpp11/chat/chat_server.cpp for similar example of implicit strand.
For 100% safety (in case of future modification of processing_state::next_read
method), processing_state::start
can use asio::dispatch
or asio::defer
or asio::post
to wrap the 1st call of processing_state::next_read
method with explicit strand (processing_state::executor
).
tie all reading to one strand
I'm not familiar with asio::async_compose
function, but according to documentation it seems that all access to the members of device
instance (after it is created, i.e. excluding constructor of device
) are wrapped with explicit strand (device::executor
).
thread-safety
If I'm not wrong with previous outcomes, then:
processing_state
class.device
class.processing_state
class instance can be performed in parallel to access to members of device
class instance (I guess, this is intended behavior of this GitHub issue).If you assume the same under thread-safety, then you have it.
maximise all threads
There are 2 strands in your modified example (which link I shared at the beginning of this message) and the logic wrapped with these strands cannot consume more than 2 threads. It means that for this example you have achieved maximum of possible parallelization.
Thank you.
@mabrarov Thank you for your answer. I'll consume the information for a bit. Though I still don't quite understand why what I had before didn't use all threads, other than Asio doesn't do anything clever when tying a strand to a thread.
The bind_executor()
made sure that the completion handler was always invoked on the processing strand and the composed operation made sure that all "reads" happened on the read strand. Both strands were separate instances and I had two threads. So enough capacity for each strand to have its own thread. According to the Asio docs it should work...
For now I'm gonna remove asio executors from my code and revert back to manual threading. I have control over what's going on and can guarantee that processing and reading are done on their own thread. I have to go back to using mutexes, condition variables, etc but so be it.
Hi @pfeatherstone,
Consider reading documentation for executors and asio::bind_executor
function to clarify additional (!) guarantees which it gives (comparing to asio::defer
or asio::dispatch
). In fact, https://github.com/mabrarov/asio-1240/blob/main/main.cpp wraps reading with 2 strands, where the last one (the top in the call stack) is processing_state::executor
strand.
Following code:
asio::dispatch(strand2, std::bind_executor(strand1, func));
Can lead to a call stack like:
or even to a call stack like (depends on busyness of strand2):
because asio::strand
class honors original executor assigned with func
.
Hi @mabrarov,
So you're saying that the completion handler processing_state::loop
is running in executor_proc
but because it was initiated from executor_device
it is also serialized/sequential with all other handlers running on executor_device
? And that's why they are not running in parallel?
Hi @pfeatherstone,
I mean usage of asio::bind_executor
in https://github.com/mabrarov/asio-1240/blob/main/main.cpp leads to processing_state::executor
acting as strand1 (and self
acting as func
) in my previous comment. This leads to the same strand being used for https://github.com/mabrarov/asio-1240/blob/main/main.cpp#L101 and for https://github.com/mabrarov/asio-1240/blob/main/main.cpp#L35.
Hi @mabrarov,
Gotcha. Sorry for the dumb questions. But then https://github.com/mabrarov/asio-1240/blob/7d7946bd76a61dc5dfb3e00a28c39e95ef43aca3/main.cpp#L50 will post to executor_device
so couldn't that then run on the other thread in theory?
Hi @pfeatherstone,
asio::post
also honors original executor and self
has 2 nested executors assigned, where device::executor
acts as strand2 and processing_state::executor
acts as strand1 in that comment.
Another thread - yes (depends on implementation of de-queuing in asio::strand
). Another strand (comparing to processing_state::executor
) - no. In total https://github.com/mabrarov/asio-1240/blob/main/main.cpp#L35 and https://github.com/mabrarov/asio-1240/blob/main/main.cpp#L102 run within the same explicit strand, so only one thread (of 2) is consumed at every moment of time.
Right. Thank you. I get it. I think Asio is the wrong way to do this stuff then. Using 2 manual threads, one for read and one for processing is what I want. This clears things up I think. Asio is a lot more subtle than I thought.
Hi @pfeatherstone,
Asio can be correct way for solving your task (refer to https://github.com/mabrarov/asio-1240/blob/feature/parallel_processing/main.cpp) and you was very close to it :)
Solution with 2 manual threads is almost the same and have both pros:
device
class instance are always accessed from the same thread(s) and members of processing_state
class instance are always accessed from the same thread(s) which are different from the thread(s) accessing members of device
class instance)and cons:
BTW, I use similar logic in https://github.com/mabrarov/asio_samples/tree/master/examples/ma_echo_server (it looks complicated and ugly, because it supports both recent and old versions of Asio and even C++03), where
asio::io_context::run
method, each thread pool uses it's own instance of asio::io_context
).This way I can "guarantee" minimum % of CPU time allocated for accepting TCP connections and managing list of active TCP connections.
Thank you.
Hi @mabrarov,
All calls of processing_state::next_read method, except the 1st one initiated by processing_state::start, are performed within explicit strand. The 1st and the 2nd calls of processing_state::next_read method are performed within "implicit strand", though there is a chance that the 2nd call can return before the 1st call returns, but it's safe, because processing_state::next_read method doesn't have logic (doesn't access members of processing_state class) after calling device::async_read method.
You mention it's only safe in this particular case. I'm nervous doing this in this way in case at a later date the async wrapper is used in a slightly different way, in which case thread safety is no longer guaranteed.
Hi @pfeatherstone,
Refer to that part of my comment (maybe I modified the comment after you read it. Sorry for that. I just want to make my comments more correct and full, because there are similar questions in other Asio GitHub issues where I'd like to reference this thread):
Refer to
chat_session::start
andchat_session::do_read_header
methods in https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/example/cpp11/chat/chat_server.cpp for similar example of implicit strand.For 100% safety (in case of future modification of
processing_state::next_read
method),processing_state::start
can useasio::dispatch
orasio::defer
orasio::post
to wrap the 1st call ofprocessing_state::next_read
method with explicit strand (processing_state::executor
).
This part:
void start()
{
next_read();
}
can be modified like:
void start()
{
asio::post(executor, [this](){ next_read(); });
}
to always call processing_state::next_read
within the same explicit strand.
Ok now I'm really confused.
#include <thread>
#include <iostream>
#include <chrono>
#include <boost/asio.hpp>
using namespace std;
using namespace std::chrono;
using namespace std::chrono_literals;
namespace asio = boost::asio;
struct processing_state
{
asio::strand<asio::io_context::executor_type> strand_proc;
asio::strand<asio::io_context::executor_type> strand_read;
size_t op_id{0};
void start()
{
do_read(op_id++);
}
void do_read(size_t id)
{
asio::post(strand_read, [this, id]{on_read(id);});
}
void on_read(size_t id)
{
assert(strand_read.running_in_this_thread());
cout << id << " : Reading thread id " << std::this_thread::get_id() << endl;
// Do some crazy operation
const auto start = high_resolution_clock::now();
while ((high_resolution_clock::now() - start) < 2s);
cout << id << " : Reading complete" << endl;
asio::post(strand_proc, [this, id]{loop(id);});
}
void loop(size_t id)
{
assert(strand_proc.running_in_this_thread());
// Launch next async op and come back on this executor
do_read(op_id++);
// Do some work on read stuff
cout << id << " : Processing thread id " << std::this_thread::get_id() << endl;
const auto start = high_resolution_clock::now();
while ((high_resolution_clock::now() - start) < 2s);
cout << id << " : Processing complete" << endl;
}
};
int main()
{
using strand = asio::strand<asio::io_context::executor_type>;
const size_t nthreads = 2;
asio::io_context ctx(nthreads);
std::vector<std::jthread> pool;
processing_state state{asio::make_strand(ctx), asio::make_strand(ctx)};
state.start();
for (size_t i = 0 ; i < (nthreads - 1) ; ++i)
pool.emplace_back([&]{ctx.run();});
ctx.run();
}
This works, i have two strands, they are posting between each other using asio::post()
and I'm maxing out both threads. Weirdly, a strand can be handled by multiple threads.
Fundamentally, I don't know what the difference is between this example and the previous one that used a composed operation to post onto a strand before performing an op. In this case, I'm doing it manually. Something is different.
I might try this manual approach of posting between strands at work tomorrow to control my uhd device. I thought the composed op was a nice wrapper but obviously i'm doing something wrong.
asio::async_compose
seems to be "newer" API which was introduced later and which provides more features / flexibility comparing to asio::post
and asio::dispatch
. asio::async_compose
is also more oriented on extension of Asio and / or design of new Asio-like API (asynchronous methods), while asio::post
and asio::dispatch
are part of classic task-based API. I would recommend to start from Asio documentation and carefully read all documentation including examples which are great in clarification of concepts and approaches / design patterns.
Cheers. The examples are usually great but I find the documentation a bit naff.
I might try this manual approach of posting between strands at work tomorrow to control my uhd device. I thought the composed op was a nice wrapper but obviously i'm doing something wrong.
By the way, this worked a treat. I just use post()
to go between "read" strand and "processing" strand. All threads are maxed out, I don't need any manual synchronisation and the UHD devices is streaming really smoothly.
I didn't have to use defer()
. I can appreciate the docs say defer()
is used if your handler is a "continuation". Practically, I don't know what that means or what it implies but i didn't need it.
Still confused as to why my async operation wrapper didn't work. I was trying to add layer of safety such that you always "post"ed onto the read strand first. But anyways. I think some clarifications, docs or examples on using async_compose
vs normal task pipelines using post()
, dispatch()
and defer()
, would be great.
Hi @pfeatherstone ,
Your original example (https://github.com/mabrarov/asio-1240/blob/main/main.cpp) doesn't work as you expect, because of asio::bind_executor
at https://github.com/mabrarov/asio-1240/blob/main/main.cpp#L99 and not because of asio::async_compose
. That's the reason I'm interested in this thread - because it demonstrates:
asio::bind_executor
actually does, e.g. what is the difference b/w asio::bind_executor(executor, f)
and asio::defer(executor, f)
or asio::dispatch(executor, f)
or asio::post(executor, f)
(I totally agree with respective naming and behavior in Asio).I guess, both points are good candidates for enhancement of Asio documentation (FYI @chriskohlhoff).
Thank you.
I swear the documentation somewhere says bind_executor() has the same effect as calling post().
Hi @pfeatherstone ,
Your original example (https://github.com/mabrarov/asio-1240/blob/main/main.cpp) doesn't work as you expect, because of
asio::bind_executor
at https://github.com/mabrarov/asio-1240/blob/main/main.cpp#L99 and not because ofasio::async_compose
. That's the reason I'm interested in this thread - because it demonstrates:
- How 2 strands are (not)combined together (I totally agree with the way it works in Asio)
- Misunderstanding of what
asio::bind_executor
actually does, e.g. what is the difference b/wasio::bind_executor(executor, f)
andasio::defer(executor, f)
orasio::dispatch(executor, f)
orasio::post(executor, f)
(I totally agree with respective naming and behavior in Asio).I guess, both points are good candidates for enhancement of Asio documentation (FYI @chriskohlhoff).
Thank you.
I would also add strand.wrap(f)
Reading the docs again you find the statement:
if a completion handler goes through a strand, then all intermediate handlers should also go through the same strand
So because i was calling bind_executor(strand_proc)
, the composed operation was ensuring all intermediate ops, including posting to strand_read
was also going through the strand_proc
strand, and so both the read and the processing were happening on the strand_proc
strand.
Gotcha. So maybe the docs were clear enough. But that's subtle.
Out of interest, anybody knows how to do this using coroutines? I guess I would need two coroutines, one for each strand? Somehow, they talk to each other. Maybe using a channel? Or is there a simpler, better to way ? Cheers
I have a thread pool where each thread is running
asio::io_context::run()
. I have an application with different strands flying about. It would seem that all my strands are running on the same thread. So i'm not getting any parallelisation at all it seems. Is there a way to avoid this and pin a strand to a specific thread?