Closed carun closed 4 months ago
please use asynchronous timer instead of the sleep_for function. for example:
void longRuningFunc(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&cb) const {
using namespace std::chrono_literals;
std::cout << "Received request.. " << std::this_thread::get_id()
<< std::endl;
// simulate long running process
trantor::EventLoop::getEventLoopOfCurrentThread->runAfter(1s, [cb=std::move(cb)](){
auto res = drogon::HttpResponse::newHttpResponse();
res->setContentTypeCode(drogon::ContentType::CT_APPLICATION_JSON);
res->setStatusCode(drogon::HttpStatusCode::k200OK);
res->setBody("");
cb(res);
std::cout << "..Responded" << std::this_thread::get_id() << std::endl;
});
please refer to the Understanding-drogon-threading-model for more details.
Thanks @an-tao. In my case I call a long running math computation (not a DB query/IO). Is there a different helper function I can make use of?
In other words, can req
and cb
be passed to a different worker thread pool so the current HTTP thread can be returned safely to handle IO?
This is crashing at the moment. Is there an example that I can look into to see a how a long running task is handled on a separate thread and the results are sent back to the caller?
#include <chrono>
#include <drogon/HttpController.h>
#include <trantor/net/EventLoop.h>
// some pre-existing thread or thread pool
std::thread th;
void longRunningFunc(const drogon::HttpRequestPtr& req, std::function<void(const drogon::HttpResponsePtr&)>&& cb) {
// Do a heavy computation and once done, send the response to the callback.
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
// Once the computation is done, send the result
drogon::app().getLoop()->queueInLoop([req = std::move(req), cb = std::move(cb)]() {
auto res = drogon::HttpResponse::newHttpResponse();
res->setContentTypeCode(drogon::ContentType::CT_APPLICATION_JSON);
res->setStatusCode(drogon::HttpStatusCode::k200OK);
res->setBody("");
cb(res);
std::cout << "..Responded" << std::this_thread::get_id() << std::endl;
});
}
class Handler : public drogon::HttpController<Handler> {
public:
METHOD_LIST_BEGIN
ADD_METHOD_TO(Handler::handleApi, "/api", drogon::Get);
METHOD_LIST_END
void handleApi(const drogon::HttpRequestPtr& req, std::function<void(const drogon::HttpResponsePtr&)>&& cb) const {
using namespace std::chrono_literals;
std::cout << "Received request.. " << std::this_thread::get_id() << std::endl;
// Start the work on a new worker thread (or a preexisting pool of threads)
th = std::thread(longRunningFunc, std::move(req), std::move(cb));
// return this current thread, so it can be used by Drogon IO event loop
return;
}
};
int main(int argc, char** argv) {
drogon::app()
.addListener("0.0.0.0", 8080)
.setThreadNum(std::thread::hardware_concurrency() * 2)
.setClientMaxBodySize(1 * 1024 * 1024 * 1024)
.run();
}
In other words, can
req
andcb
be passed to a different worker thread pool so the current HTTP thread can be returned safely to handle IO?
Of course, you can do this, which is the advantage of asynchronous interfaces using callbacks.
Backtrace:
(gdb) bt
#0 __pthread_kill_implementation (no_tid=0, signo=6, threadid=140735265367616) at ./nptl/pthread_kill.c:44
#1 __pthread_kill_internal (signo=6, threadid=140735265367616) at ./nptl/pthread_kill.c:78
#2 __GI___pthread_kill (threadid=140735265367616, signo=signo@entry=6) at ./nptl/pthread_kill.c:89
#3 0x00007ffff763c476 in __GI_raise (sig=sig@entry=6) at ../sysdeps/posix/raise.c:26
#4 0x00007ffff76227f3 in __GI_abort () at ./stdlib/abort.c:79
#5 0x00007ffff78e4bbe in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#6 0x00007ffff78f024c in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#7 0x00007ffff78f02b7 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#8 0x000055555557f13f in std::thread::operator= (__t=..., this=0x5555556a6cd0 <th>) at /usr/include/c++/11/bits/std_thread.h:165
#9 Handler::handleApi(std::shared_ptr<drogon::HttpRequest> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) const (this=<optimized out>, req=warning: RTTI symbol not found for class 'std::_Sp_counted_deleter<drogon::HttpRequestImpl*, drogon::HttpRequestParser::makeRequestForPool(drogon::HttpRequestImpl*)::{lambda(drogon::HttpRequestImpl*)#1}, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>'
warning: RTTI symbol not found for class 'std::_Sp_counted_deleter<drogon::HttpRequestImpl*, drogon::HttpRequestParser::makeRequestForPool(drogon::HttpRequestImpl*)::{lambda(drogon::HttpRequestImpl*)#1}, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>'
std::shared_ptr<drogon::HttpRequest> (use count 7, weak count 0) = {...}, cb=...)
at /home/arun/code/oss/drogon-sample/main.cpp:34
#10 0x000055555558257f in drogon::internal::HttpBinder<void (Handler::*)(std::shared_ptr<drogon::HttpRequest> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) const>::callFunction<std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&, true, true, true>(std::shared_ptr<drogon::HttpRequest> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&) (req=warning: RTTI symbol not found for class 'std::_Sp_counted_deleter<drogon::HttpRequestImpl*, drogon::HttpRequestParser::makeRequestForPool(drogon::HttpRequestImpl*)::{lambda(drogon::HttpRequestImpl*)#1}, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>'
warning: RTTI symbol not found for class 'std::_Sp_counted_deleter<drogon::HttpRequestImpl*, drogon::HttpRequestParser::makeRequestForPool(drogon::HttpRequestImpl*)::{lambda(drogon::HttpRequestImpl*)#1}, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>'
std::shared_ptr<drogon::HttpRequest> (use count 7, weak count 0) = {...}, this=0x5555556baa50) at /opt/dev-setup/drogon-1.8.2/include/drogon/HttpBinder.h:380
#11 drogon::internal::HttpBinder<void (Handler::*)(std::shared_ptr<drogon::HttpRequest> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) const>::run<, 0ul, false>(std::deque<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >&, std::shared_ptr<drogon::HttpRequest> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) (this=0x5555556baa50, req=warning: RTTI symbol not found for class 'std::_Sp_counted_deleter<drogon::HttpRequestImpl*, drogon::HttpRequestParser::makeRequestForPool(drogon::HttpRequestImpl*)::{lambda(drogon::HttpRequestImpl*)#1}, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>'
warning: RTTI symbol not found for class 'std::_Sp_counted_deleter<drogon::HttpRequestImpl*, drogon::HttpRequestParser::makeRequestForPool(drogon::HttpRequestImpl*)::{lambda(drogon::HttpRequestImpl*)#1}, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>'
std::shared_ptr<drogon::HttpRequest> (use count 7, weak count 0) = {...}, callback=...) at /opt/dev-setup/drogon-1.8.2/include/drogon/HttpBinder.h:296
#12 0x00005555555aac35 in drogon::HttpControllersRouter::doControllerHandler(std::shared_ptr<drogon::HttpControllersRouter::CtrlBinder> const&, drogon::HttpControllersRouter::HttpControllerRouterItem const&, std::shared_ptr<drogon::HttpRequestImpl> const&, std::__cxx11::match_results<__gnu_cxx::__normal_iterator<char const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::__cxx11::sub_match<__gnu_cxx::__normal_iterator<char const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > > const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) ()
#13 0x00005555555adaa0 in drogon::HttpControllersRouter::route(std::shared_ptr<drogon::HttpRequestImpl> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) ()
#14 0x00005555555e53a8 in drogon::HttpSimpleControllersRouter::route(std::shared_ptr<drogon::HttpRequestImpl> const&, std::function<void (std::shared_ptr<drogon::HttpResponse> const&)>&&) ()
#15 0x0000555555625ff2 in drogon::HttpServer::onRequests(std::shared_ptr<trantor::TcpConnection> const&, std::vector<std::shared_ptr<drogon::HttpRequestImpl>, std::allocator<std::shared_ptr<drogon::HttpRequestImpl> > > const&, std::shared_ptr<drogon::HttpRequestParser> const&) ()
#16 0x000055555562799d in drogon::HttpServer::onMessage(std::shared_ptr<trantor::TcpConnection> const&, trantor::MsgBuffer*) ()
#17 0x0000555555658236 in trantor::TcpConnectionImpl::readCallback() ()
#18 0x00005555556493f0 in trantor::Channel::handleEventSafely() ()
#19 0x000055555564949e in trantor::Channel::handleEvent() ()
#20 0x000055555563db4c in trantor::EventLoop::loop() ()
#21 0x000055555563f192 in trantor::EventLoopThread::loopFuncs() ()
#22 0x00007ffff791e2b3 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#23 0x00007ffff768eb43 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#24 0x00007ffff7720a00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
(gdb)
th = std::thread(longRunningFunc, std::move(req), std::move(cb));
You should call the th.detach() method to let the th object be safely destroied.
auto th = std::thread([req, cb=std::move(cb)]() mutable {longRunningFunc(req, std::move(cb));});
th.detach();
th = std::thread(longRunningFunc, std::move(req), std::move(cb));
You should call the th.detach() method to let the th object be safely destroied.
auto th = std::thread([req, cb=std::move(cb)]() mutable {longRunningFunc(req, std::move(cb));}); th.detach();
Sorry, this is probably not the correct code to reproduce. Let me create a valid one. I tried simplifying the crash from my application and that introduced the detach
issue. :)
But do you think my idea is right?
th = std::thread(longRunningFunc, std::move(req), std::move(cb));
You should call the th.detach() method to let the th object be safely destroied.
auto th = std::thread([req, cb=std::move(cb)]() mutable {longRunningFunc(req, std::move(cb));}); th.detach();
Sorry, this is probably not the correct code to reproduce. Let me create a valid one. I tried simplifying the crash from my application and that introduced the
detach
issue. :)But do you think my idea is right?
You can do that, but IMHO, I don't think the QPS of a computing intensive application can be improved in this way. The maximum QPS of a single machine should be the number of cores divided by the time of each calculation. If you set the thread_num of drogon to the number of CPU cores, no matter you move the calculation to other threads or not, the QPS is the same.
You can do that, but IMHO, I don't think the QPS of a computing intensive application can be improved in this way. The maximum QPS of a single machine should be the number of cores divided by the time of each calculation. If you set the thread_num of drogon to the number of CPU count, no matter you move the calculation to other threads or not, the QPS is the same.
You are absolutely right. That was my expectation with my first example, that, setting thread count to CPU core would solve the problem. But only half the requests were delivered by Drogon in parallel. So QPS never hit the maximum.
You can do that, but IMHO, I don't think the QPS of a computing intensive application can be improved in this way. The maximum QPS of a single machine should be the number of cores divided by the time of each calculation. If you set the thread_num of drogon to the number of CPU count, no matter you move the calculation to other threads or not, the QPS is the same.
You are absolutely right. That was my expectation with my first example, that, setting thread count to CPU core would solve the problem. But only half the requests were delivered by Drogon in parallel. So QPS never hit the maximum.
In your first example, your CPU count is 10, you set the number of thread to 12, so as you can see, the maximum concurrent requests handled is 12, and according to the output of wrk, the QPS is 10, This is as expected because your machine has 10 cores and every computing need 1s.
By the way, if you set the number of threads to 16, you will see 16 requests are handled at the same time, but the QPS is also 10.
This is as expected because your machine has 10 cores and every computing need 1s.
My machine has 16 cores and I'm running with 2 additional threads = 18 threads. That's why I used wrk -t16 -c16
.
It seems that some IO threads handle more than one connections and some have no connection. Drogon uses linux core to dispatch a new connection to a specific IO thread. So, yes, you could make a concurrent threads pool to avoid this condition. or make your test more connections than 16 to let all IO threads have requests to handle at the same time. for example:
wrk -d10 -c48 -t1 http://localhost:8080/api
note that wrk uses non-blocking IO as drogon does, so you could use one thread to handle 48 connections.
you could make a concurrent threads pool
I looked at Trantor EventLoopThreadPool, but as you mentioned it uses linux core and thus doesn't fit the use case. Is there a different generic thread pool in Trantor that can be used for this purpose?
you could make a concurrent threads pool
I looked at Trantor EventLoopThreadPool, but as you mentioned it uses linux core and thus doesn't fit the use case. Is there a different generic thread pool in Trantor that can be used for this purpose?
trantor::ConcurrentTaskQueue
trantor::ConcurrentTaskQueue
Nice. It will be good to add an example using this in Drogon HTTP benchmark or somewhere.
Drogon HTTP server is limiting the number of concurrent requests being submitted to the handler. This is observed with all the recent versions of Drogon. I've tested with v1.8.0 and above.
See this code to reproduce
Build using
Test using
Expected behavior The expectation is all the requests from the client needs to be accepted at once, as the IO thread count is available to handle.
Screenshots
https://user-images.githubusercontent.com/471374/206050311-860e3e6a-1c6c-4850-a8ed-56718e2efd27.mp4
The screen recording shows that
wrk
submits 16 concurrent requests, but Drogon handles only 12 at a time. This varies with every run. On a box with 96 cores, this gets even worse. Submitting 300 requests gets only 90 requests to the handler.Desktop (please complete the following information):