microsoft / cpprestsdk

The C++ REST SDK is a Microsoft project for cloud-based client-server communication in native code using a modern asynchronous C++ API design. This project aims to help C++ developers connect to and interact with services.
Other
8k stars 1.65k forks source link

deadlock in ~basic_file_buffer #111

Open vladimir-kovalev opened 8 years ago

vladimir-kovalev commented 8 years ago

Hi,

Either I do something wrong or there's a deadlock. I've implemented simple REST server using cpprestsdk. Basically it just serves a couple of endpoints and static content. Here's an implementation for static serving:

class StaticServer : public Endpoint {
public:
    StaticServer(): Endpoint(web::http::methods::GET, "/",
            std::bind(&StaticServer::serve, this, std::placeholders::_1)) {}

private:
    pplx::task<void> serve(web::http::http_request request) {
        const std::string file = file_from_request(request.request_uri());

        if (access(file.c_str(), F_OK) != 0) {
            return request.reply(status_codes::NotFound);
        }
        if (access(file.c_str(), R_OK) != 0) {
            return request.reply(status_codes::Forbidden);
        }

        web::http::http_response response(status_codes::OK);
        response.headers().set_content_type(file_mime(file));
        response.headers().set_cache_control(file_cache(file));

        return concurrency::streams::fstream::open_istream(file, std::ios_base::in | std::ios_base::binary)
        .then([=](concurrency::streams::istream stream) mutable {
            response.set_body(stream);
            return request.reply(response);
        });
    }
};

And here's how listeners are created:

virtual void listen() {
    for (auto endpoint: endpoints_) {
        for (auto method: endpoint->methods()) {
            auto listener = createListener(endpoint->route()); // returns std::unique_ptr<http_listener>
            listener->support(method, [=](http_request request) {
                endpoint->handler()(request).then([=] { // endpoint->handler() returns std::function<...> which handles this endpoint. See StaticServer::serve
                    LOG << "request finished: " << request.request_uri().to_string() << std::endl;
                });
            });
            listener->open().wait();
            listeners_.push_back(std::move(listener));
        }
    }
}

I've restricted threadpool used in cpprestsdk from 40 to 4 threads because server is running on embedded device. Main server's thread calls listen() and then just sleeps in an infinite loop. When I open the browser and send the request (just http://localhost:port/) it works for a couple of times and then locks. Here's the backtrace of one of the threads:

==10080==    at 0x55A2DA0: pthread_cond_wait@@GLIBC_2.3.2 (pthread_cond_wait.S:185)
==10080==    by 0x5EAFCDB: std::condition_variable::wait(std::unique_lock<std::mutex>&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.21)
==10080==    by 0x432F12: void std::condition_variable::wait<pplx::details::event_impl::wait(unsigned int)::{lambda()#1}>(std::unique_lock<std::mutex>&, pplx::details::event_impl::wait(unsigned int)::{lambda()#1}) (condition_variable:98)
==10080==    by 0x428A1E: pplx::details::event_impl::wait(unsigned int) (pplxlinux.h:117)
==10080==    by 0x428AF2: pplx::details::event_impl::wait() (pplxlinux.h:133)
==10080==    by 0x42A06B: pplx::details::_TaskCollectionImpl::_Wait() (pplx.h:182)
==10080==    by 0x42A051: pplx::details::_TaskCollectionImpl::_RunAndWait() (pplx.h:177)
==10080==    by 0x42AD49: pplx::details::_Task_impl_base::_Wait() (pplxtasks.h:1778)
==10080==    by 0x4347E9: pplx::task<unsigned char>::wait() const (pplxtasks.h:3519)
==10080==    by 0x42C253: pplx::task<void>::wait() const (pplxtasks.h:4428)
==10080==    by 0x46D406: Concurrency::streams::details::basic_file_buffer<unsigned char>::~basic_file_buffer() (filestream.h:111)
==10080==    by 0x46D4AD: Concurrency::streams::details::basic_file_buffer<unsigned char>::~basic_file_buffer() (filestream.h:118)
==10080==    by 0x47A88B: std::_Sp_counted_ptr<Concurrency::streams::details::basic_file_buffer<unsigned char>*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
==10080==    by 0x4388F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:149)
==10080==    by 0x432DFA: std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() (shared_ptr_base.h:666)
==10080==    by 0x435573: std::__shared_ptr<Concurrency::streams::details::basic_streambuf<unsigned char>, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() (shared_ptr_base.h:914)
==10080==    by 0x4355B3: std::shared_ptr<Concurrency::streams::details::basic_streambuf<unsigned char> >::~shared_ptr() (shared_ptr.h:93)
==10080==    by 0x43AEEE: Concurrency::streams::streambuf<unsigned char>::~streambuf()
==10080==    by 0x46BF3F: Concurrency::streams::details::basic_istream_helper<unsigned char>::~basic_istream_helper() (streams.h:59)
==10080==    by 0x4672E5: void __gnu_cxx::new_allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> >::destroy<Concurrency::streams::details::basic_istream_helper<unsigned char> >(Concurrency::streams::details::basic_istream_helper<unsigned char>*) (new_allocator.h:124)
==10080==    by 0x462AC6: std::enable_if<std::__and_<std::allocator_traits<std::allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> > >::__destroy_helper<Concurrency::streams::details::basic_istream_helper<unsigned char> >::type>::value, void>::type std::allocator_traits<std::allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> > >::_S_destroy<Concurrency::streams::details::basic_istream_helper<unsigned char> >(std::allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> >&, Concurrency::streams::details::basic_istream_helper<unsigned char>*) (alloc_traits.h:282)
==10080==    by 0x45F0F4: void std::allocator_traits<std::allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> > >::destroy<Concurrency::streams::details::basic_istream_helper<unsigned char> >(std::allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> >&, Concurrency::streams::details::basic_istream_helper<unsigned char>*) (alloc_traits.h:411)
==10080==    by 0x458EE0: std::_Sp_counted_ptr_inplace<Concurrency::streams::details::basic_istream_helper<unsigned char>, std::allocator<Concurrency::streams::details::basic_istream_helper<unsigned char> >, (__gnu_cxx::_Lock_policy)2>::_M_dispose() (shared_ptr_base.h:524)
==10080==    by 0x4388F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:149)
==10080==    by 0x4F62ADD: web::http::details::_http_response::~_http_response()
==10080==    by 0x4677F4: void __gnu_cxx::new_allocator<web::http::details::_http_response>::destroy<web::http::details::_http_response>(web::http::details::_http_response*) (new_allocator.h:124)
==10080==    by 0x462E1D: std::enable_if<std::__and_<std::allocator_traits<std::allocator<web::http::details::_http_response> >::__destroy_helper<web::http::details::_http_response>::type>::value, void>::type std::allocator_traits<std::allocator<web::http::details::_http_response> >::_S_destroy<web::http::details::_http_response>(std::allocator<web::http::details::_http_response>&, web::http::details::_http_response*) (alloc_traits.h:282)
==10080==    by 0x45F44E: void std::allocator_traits<std::allocator<web::http::details::_http_response> >::destroy<web::http::details::_http_response>(std::allocator<web::http::details::_http_response>&, web::http::details::_http_response*) (alloc_traits.h:411)
==10080==    by 0x45972A: std::_Sp_counted_ptr_inplace<web::http::details::_http_response, std::allocator<web::http::details::_http_response>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() (shared_ptr_base.h:524)
==10080==    by 0x4388F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:149)
==10080==    by 0x4F73A44: std::_Sp_counted_ptr_inplace<pplx::details::_Task_completion_event_impl<web::http::http_response>, std::allocator<pplx::details::_Task_completion_event_impl<web::http::http_response> >, (__gnu_cxx::_Lock_policy)2>::_M_dispose() 
==10080==    by 0x4388F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:149)
==10080==    by 0x4F62C17: web::http::details::_http_request::~_http_request()
==10080==    by 0x4388F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:149)
==10080==    by 0x50979E1: web::http::experimental::listener::details::connection::handle_http_line(boost::system::error_code const&)
==10080==    by 0x509A301: boost::asio::detail::read_until_match_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, std::allocator<char>, web::http::experimental::listener::details::crlf_nonascii_searcher_t, web::http::experimental::listener::details::connection::start_request_response()::{lambda(boost::system::error_code const&, unsigned long)#2}>::operator()(boost::system::error_code const&, unsigned long, int)
==10080==    by 0x509BD86: boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, boost::asio::detail::read_until_match_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, std::allocator<char>, web::http::experimental::listener::details::crlf_nonascii_searcher_t, web::http::experimental::listener::details::connection::start_request_response()::{lambda(boost::system::error_code const&, unsigned long)#2}> >::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long)
==10080==    by 0x4FDC694: boost::asio::detail::epoll_reactor::descriptor_state::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long)
==10080==    by 0x4FDC322: boost::asio::detail::task_io_service::run(boost::system::error_code&)
==10080==    by 0x505BD25: crossplat::threadpool::thread_start(void*)
==10080==    by 0x559D6A9: start_thread (pthread_create.c:333)
==10080==    by 0x649BE9C: clone (clone.S:109)

All 4 threads in a threadpool are locked in this condition (all have the same backtrace). So am I doing something wrong? All 4 threads locked waiting for a signal. Here's the relevant code (include/cpprest/filestream.h):

        virtual ~basic_file_buffer()
        {
            if( this->can_read() )
            {
                this->_close_read().wait(); // Line 111: locks here
            }

            if (this->can_write())
            {
                this->_close_write().wait();
            }
        }

So, all 4 threads are waiting and no free threads are in threadpool to schedule a task which will fire set (this is just my guess). The ~basic_file_buffer is called because I set concurrency::streams::istream as a body for response. So when response is destructed the stream is destructed too causing destruction of the underlying buffer.

The server (and so cpprestsdk) is compiled and running on x86_64 linux (ubuntu 15.10). gcc version is 4.9.3. I can reproduce this in 100% of the cases. It serves the files for 2-3 times (sending around 20-30 files each time) and then locks.

Thanks in advance.

vladimir-kovalev commented 8 years ago

And basically I think that no code inside the library (cpprestsdk) should implicitly call wait or get because there may be a situation when all available threads in a threadpool are locked (e.g. waiting) and no free thread exists to send a signal for those are waiting.

ras0219-msft commented 8 years ago

Generally, you're completely right about avoiding calls to wait or get -- however destructors are a bit more complex. In objects which control external asynchronous operations, the destructor needs to ensure all operations have completed before returning since otherwise you could very easily get undefined behavior.

This issue looks quite complex, I'm not immediately sure how to address it. One approach you could try is to manually call close() on the stream after the response is completed.

vladimir-kovalev commented 8 years ago

@ras0219-msft Thanks for the response. I thought about manually closing the stream but there's a problem: you only should close it when response is sent (an so you don't need the stream any more), but you can't (easily) say when it happens. reply() function just sets the response (sends a signal) and returns. Here's related code (Release/src/http/listener/http_listener_msg.cpp):

pplx::task<void> details::_http_request::_reply_impl(http_response response)
{
    if (response.reason_phrase().empty())
    {
        response.set_reason_phrase(get_default_reason_phrase(response.status_code()));
    }

    pplx::task<void> response_completed;

#if !defined(__cplusplus_winrt) && _WIN32_WINNT >= _WIN32_WINNT_VISTA
    ... windows stuff ...
#endif
    {
        response_completed = pplx::task_from_result();
    }

    m_response.set(response);
    return response_completed;
}

I was not able to find in documentation (github pages/doxygen) or in the sources how to be notified when response is really sent.

Possible workaround - RAII class for response:

class http_response_holder: public web::http::http_response { // Or http_response can be a member
public:
    ~http_response_holder() { someResourceManager->readyToDestroy(body()); } // body() return concurrency::streams::istream
};

But that solves only one issue and not another one: blocking operations. Suppose I have a function (like this StaticServer::serve) that serves the requests. Imagine this function does a blocking operation (like get on a task, e.g. when opening the stream, getting the body (extract_xxx), whatever). Consider N requests arrive at the same time and we have N threads in a thread pool. This function will be run in all N threads each serving its request. So we have a lock (well, not a deadlock, but sort of) again: all threads are waiting for a signal (some other thread(s) should do notify_xxx on a cond variable).

Possible solutions:

  1. Make a soft and hard limit in a threadpool implementation. Current limit will be soft limit. It means that one can not exceed it but a library wait/get operations may spawn a couple of threads (until reaching hard limit) to solve that issue.
  2. Do not use shared threadpool. Each listener shares the same threadpool:
void hostport_listener::start()
{
    auto& service = crossplat::threadpool::shared_instance().service();
    ...
    auto socket = new ip::tcp::socket(service);
    ...
}

So we can add an option to http_listener_config - a maximum number of concurrent requests that server suppose to handle (e.g. 10 means no more than 10 concurrent requests allowed. If there are more they will need to wait). Using this option we can use a separate threadpool for each listener.

ahmedyarub commented 5 years ago

I'm reviving this issue because it's simply rendering the SDK useless. When the service is overwhelmed with many requests, in a short time all the threads would be in a status of waiting forever. The only way to solve this problem is be restarting the application. I've tested with Restinio (and other non-C++ frameworks) and did not face the problem. I'll be trying to make an implementation to solve this problem in the next days/weeks and share the results with you.

blockspacer commented 4 years ago

What's the current status of that issue?

jjmcwill commented 3 years ago

We've also been trying to limit the number of threads used by CPPRestSDK in our product due to the fact that some customers are sensitive to the amount of memory each worker thread consumes.

Unfortunately, that means all our worker threads (5) are deadlocked with the following stack:

`#0 0x00007fdfde741995 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0

1 0x00007fdfdd63482c in std::condition_variable::wait(std::unique_lock&) () from /lib64/libstdc++.so.6

2 0x00000000006ab963 in pplx::details::_Task_impl_base::_Wait() ()

3 0x00000000006f7e77 in std::_Sp_counted_ptr<Concurrency::streams::details::basic_file_buffer*, (__gnu_cxx::_Lock_policy)2>::_M_dispose() ()

4 0x0000000000654af6 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()

5 0x0000000000654af6 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()

6 0x00000000006ae305 in web::http::details::http_msg_base::~http_msg_base() ()

7 0x00000000006ae47a in std::_Sp_counted_ptr_inplace<web::http::details::_http_response, std::allocator, (__gnu_cxx::_Lock_policy)2>::_M_dispose() ()

8 0x00007fdfe00b3ec6 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() () from /remote/release/foo/BUILDNUM/linux64/bin/../lib/libcpprest.so.2.10

9 0x00007fdfe00b4f8c in std::_Sp_counted_ptr_inplace<pplx::details::_Task_completion_event_impl, std::allocator<pplx::details::_Task_completion_event_impl >, (__gnu_cxx::_Lock_policy)2>::_M_dispose() () from /remote/release/foo/BUILDNUM/linux64/bin/../lib/libcpprest.so.2.10

10 0x0000000000654af6 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()

11 0x00000000006c73d8 in web::http::details::_http_request::~_http_request() ()

12 0x0000000000654af6 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()

13 0x00000000006aa326 in std::shared_count<(__gnu_cxx::_Lock_policy)2>::operator=(std::shared_count<(__gnu_cxx::_Lock_policy)2> const&) ()`

Sure seems similar. Has any work been done on this?

ahmedyarub commented 3 years ago

Just to make it clear: I'm not working on this one. I completely abandoned this framework.

jjmcwill commented 3 years ago

@ahmedyarub What did you switch to?

ahmedyarub commented 3 years ago

Restinio. I'm using it for my personal projects and tested only with benchmarks without being deployed to production.