Stiffstream / restinio

Cross-platform, efficient, customizable, and robust asynchronous HTTP(S)/WebSocket server C++ library with the right balance between performance and ease of use
Other
1.15k stars 93 forks source link

Getting the connection state/status #28

Closed binarytrails closed 5 years ago

binarytrails commented 5 years ago

Hi devs,

We have one last operation on the server called LISTEN which keeps a connection open at the maximum defined delay in the settings handle_request_timeout. Everything is good and I can send chunks and close it using the response with response.connection_close(). I can find its ID with request->connection_id() which returns a connection_id_t.

However, we need to be able to check the connection state (open, closed) using the connection_id_t or any other object in a different thread to close the connections gracefully in case we shutdown the server as well as use this observer thread to close the active listeners on the OpenDHT thread.

Thank you for the heads up! :rocket: Seva

eao197 commented 5 years ago

However, we need to be able to check the connection state (open, closed) using the connection_id_t or any other object in a different thread to close the connections gracefully in case we shutdown the server as well as use this observer thread to close the active listeners on the OpenDHT thread.

I'm afraid I don't understand your scenario from this description :( Can you tell more about your needs?

binarytrails commented 5 years ago

@eao197 I simply want to know the state of the connection using its Id like is it open or closed to close it if it is open to on shutdown ! Thanks, example:

listen(...){
        SessionToHashToken listener;
        listener.hash = infoHash;
        listener.connId = request->connection_id();
        listener.token = dht_->listen(infoHash, [this, response](...){...});
...
        std::lock_guard<std::mutex> lock(lockListener_);
        currentListeners_.emplace_back(std::move(listener));
}
...
DhtProxyServer::removeClosedListeners(bool testSession)
{
    // clean useless listeners
    std::lock_guard<std::mutex> lock(lockListener_);
    auto listener = currentListeners_.begin();
    while (listener != currentListeners_.end()){
        auto cancel = dht_ and (not testSession); //TODO or !open(listener->connId)
        if (cancel){
            dht_->cancelListen(listener->hash, std::move(listener->token));
            // Remove listener if unused
            listener = currentListeners_.erase(listener);
        } else {
             ++listener;
        }
    }
}
eao197 commented 5 years ago

I still don't understand that desire :(

First of all, synchronous request of the current state of something in async+multithreaded environment is often useless. The state can be changed just before you receive the answer to your ask. Because of that addition of query_connection_status() (or something similar) to RESTinio API is not a good idea.

RESTinio automatically closes a connection if there is no more live request_handle_t or response_builder_t objects. So if you have a listener -- you have a request_handle_t or response_builder_t for it. Just drop request_handle_t/response_builder_t and RESTinio closes the corresponding connection (if that connection is no more used (but it can still be used by other requests due to requests pipelining)).

binarytrails commented 5 years ago

@eao197 I don't think it is an elegant solution to delete/reset a shared pointer in one thread to check in another if the connection is open. In fact, it will give more uncertainty in a case where a client simply disconnects himself and closes a connection. I think it is very important being able to query if the connection is open or closed instead of creating more overhead trying to observe its state or use exception handling to test if it's an open connection on server side. The fact that the client can close it anytime is at the contrary a reason why such a method should be present to have an elegant check leveraging the release of resources binded to a client with an open connection!

eao197 commented 5 years ago

In the current version of RESTinio you just hold request_handle_t/response_builder_t as long, as you need. When it is no more needed you drop it. If the connection is closed by the client, RESTinio will detect it and just ignore the response you send.

Addition of a synchronous method like query_connection_status() to RESTinio is not a good idea from my point of view.

It can be useful to have some additional callbacks which can be set in server settings: one will be called when a connection is accepted and another will be called when a connection is lost. But it requires some refactoring of RESTinio's internals. And I'm afraid we can't do it in May 2019, maybe in June.

binarytrails commented 5 years ago

@eao197

Now that I got quite familiar with std::asio, I agree that having additional callbacks set for an accepted and most importantly a lost connection is the way to go. The latter callback on a connection drop would allow us to release properly the resources attached to the specific connection by propagating their release internally.

eao197 commented 5 years ago

@binarytrails The master branch now contains the first working version of connection state listeners/notifications in RESTinio. An example of such a listener can be found here.

The idea is simple: a developer has to define own type with non-static method state_changed. This method should be noexcept method that accepts restinio::connection_state::notice_t object by const reference and returns void (definition of notice_t type can be found here).

Then the name of the user's type should be specified as connection_state_listener_t typedef in server traits:

    struct my_traits_t : public restinio::default_traits_t
    {
        // Change the default connection state listener to our one.
        using connection_state_listener_t = connection_listener_t;
    };

And then an instance of that type should be created by user and passed to server-settings object via connection_state_listener() method.

Note, that if RESTinio works on a thread pool then state listener object can be called by RESTinio from different threads at the same time. So it's a user's task to make that object thread-safe.

Maybe it's not a final version of connection state listener API, but we're going to release that feature as v.0.5.1 in a couple of days. So you can try it and provide us your feedback -- maybe you find some fundamental flaws in the current design.

binarytrails commented 5 years ago

@eao197

Thank you for this feature!

The only thing that bugs me is the easiness to integrate it into an existing class. Basically we need to forward declare an existing class that may already contain the data associated with the connection to handle on disconnect using the trait_t:

struct my_traits_t : public restinio::default_traits_t {
    using connection_state_listener_t = connection_listener_t;
};

Then, we need to create an instance to self with most likely shared_from_this to expose our state_changed hard-coded method name registered with this method in the settings:

Derived &&
connection_state_listener(std::shared_ptr<typename Traits::connection_state_listener_t> listener ) && {
    return std::move(this->connection_state_listener(std::move(listener)));
}

In that case, I might go with your approach of making a listener class and sharing the resources associated with a connection upon its creation and then reusing them there.

However, it seems that this overhead of an extra class for one state_changed method could have been avoided by allowing a Lambda to be accepted as a state_changed method without a forward declaration of its type required in the traits! In the case of thread pool, the synchronization mechanisms would be shared and implemented in the Lambda.

Cheers, Seva

eao197 commented 5 years ago

The only thing that bugs me is the easiness to integrate it into an existing class.

I don't understand the problem here. Do you want to use the same class as for request_handler as well as connection state listener? Something like:

class MyServer {
...
public:
   void onPost(...) {...}
   void onGet(,..) {...}
   void onSign(...) {...}
   ...
   void state_changed(const restinio::connection_state::notice_t & notice) noexcept {...}
};

?

binarytrails commented 5 years ago

@eao197

Yes but then, the callback registering method settings.connection_state_listener(...) requires a shared pointer to an instance of self thus, the shared_from_this with a forward class declaration.

binarytrails commented 5 years ago

Also, following your example I get connection state listener is not specified, see git diff:

+class conn_listener_t
+{
+public:
+    conn_listener_t() = default;
+    /**
+     * Connection state change used to handle Listeners disconnects.
+     * RESTinio >= 0.5.1 https://github.com/Stiffstream/restinio/issues/28
+     */
+    void state_changed(const restinio::connection_state::notice_t &notice) noexcept {
+        printf("connection %li is %i\n", notice.connection_id(), notice.cause());
+    }
+};
+
 using RestRouter = restinio::router::express_router_t<>;
 struct RestRouterTraits : public restinio::default_traits_t
 {
@@ -82,6 +95,7 @@ struct RestRouterTraits : public restinio::default_traits_t
     using http_methods_mapper_t = http::custom_http_methods_t;
     using logger_t = opendht_logger_t;
     using request_handler_t = RestRouter;
+    using connection_state_listener_t = conn_listener_t;
 };
 using ServerSettings = restinio::run_on_thread_pool_settings_t<RestRouterTraits>;
 using RequestStatus = restinio::request_handling_status_t;
diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp
index 8f5a460..88c0230 100644
--- a/src/dht_proxy_server.cpp
+++ b/src/dht_proxy_server.cpp
@@ -87,6 +87,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c
+        settings.connection_state_listener(std::make_shared<conn_listener_t>());
         httpServer_ = restinio::http_server_t<RestRouterTraits>(
             restinio::own_io_context(),
             std::forward<ServerSettings>(settings)
eao197 commented 5 years ago

I don't think shared_form_this is necessary. There are several ways of archiving the result you need. For example:

Number 1: usage of MyHandler class directly with shared_ptr with empty deleter:

class MyHandler {
   ... // all methods you need with state_changed.
};

struct MyTraits : public restinio::default_traits_t {
   using connection_state_listener_t = MyHandler;
};

MyHandler handler{...}; // Actual handler object. Created on stack.
restinio::run(restinio::on_thread_pool<MyTraits>(...).port(...).address(...)
   .connection_state_listener(std::shared_ptr<MyHandler>{
      &handler, // Actual pointer to handler.
      [](MyHandler *) {} // No-op custom deleter.
   })
  ...);

Number 2: Inheritance with shared_ptr with empty deleter:

class MyListener {
public:
   virtual void state_changed(...) noexcept = 0;
};

class MyHandler : public MyListener {
   ... // all methods you need with state_changed.
};

struct MyTraits : public restinio::default_traits_t {
   using connection_state_listener_t = MyListener;
};

MyHandler handler{...}; // Actual handler object. Created on stack.
restinio::run(restinio::on_thread_pool<MyTraits>(...).port(...).address(...)
   .connection_state_listener(std::shared_ptr<MyListener>{
      &handler, // Actual pointer to handler.
      [](MyHandler *) {} // No-op custom deleter.
   })
  ...);

There is also a modification of that approach without empty deleter:

class MyListener : public std::enable_shared_from_this<MyListener> {
public:
   virtual void state_changed(...) noexcept = 0;
   auto getListenerSharedPtr() const noexcept { return shared_from_this(); }
};

class MyHandler : public MyListener {
   ... // all methods you need with state_changed.
};

struct MyTraits : public restinio::default_traits_t {
   using connection_state_listener_t = MyListener;
};

auto handler = std::make_shared<MyHandler>(...); // Actual handler object. Created in heap.
restinio::run(restinio::on_thread_pool<MyTraits>(...).port(...).address(...)
   .connection_state_listener(handler->getListenerSharedPtr())
  ...);

Number 3: separate object with std::function inside:

class MyListener : public std::enable_shared_from_this<MyListener> {
   std::function<void(const restinio::connection_state::notice_t&)> handler_;
public:
   template<typename Lambda>
   MyListener(Lambda && lambda) : handler{std::forward<Lambda>(lambda)} {}

   void state_changed(... & notice) noexcept { handler_(notice); }
};

class MyHandler {
   std::shared_ptr<MyListener> listener_; 
   ...
public:
   MyHandler(...) {
      listener_ = std::make_shared<MyListener>([this](... &notice) {...});
   }
   ...
   auto getListener() const noexcept { return listener_; }
};

struct MyTraits : public restinio::default_traits_t {
   using connection_state_listener_t = MyListener;
};

MyHandler handler{...};
restinio::run(restinio::on_thread_pool<MyTraits>(...).port(...).address(...)
   .connection_state_listener(handler.getListener())
  ...);
eao197 commented 5 years ago

Also, following your example I get connection state listener is not specified

I'll check it later.

eao197 commented 5 years ago

I can't reproduce the error with "connection state listener is not set".

Also:

httpServer_ = restinio::http_server_t<RestRouterTraits>(

It seems that you create a temporary instance of http_server_t and then move it. I don't think that is a good idea. There are two constructors in http_server_t that allows direct initialization of it. So you can write:

DhtProxyServer::DhtProxyServer(...)
   : httpServer_{restinio::use_own_io_service(),
       ServerSettings{}.port(...).address(...).connection_state_listener(...)}

or

DhtProxyServer::DhtProxyServer(...)
   : httpServer_{restinio::use_own_io_service(),
       [&](ServerSetting & settings) {
          settings.port(...).address(...).connection_state_listener(...);
       }}

restinio::http_server_t should have move and copy constructors/operators deleted. It looks like our mistake in the current version of RESTinio.

binarytrails commented 5 years ago

@eao197

Well, I'm unable to create a connection listener using your method but I will keep looking.

Yes, I instantiate a temporary private member restinio::http_server_t<RestRouterTraits> httpServer_; with httpServer_(restinio::own_io_context(), []( auto & settings ){}) but this is mandatory as the settings are defined in the constructor of the Proxy using various calculation like threads, internal methods etc.

Therefore, the http_server_t should have a default constructor to allow a clear segmentation of the settings in the constructor creating it while it is still kept as a private member for later usage as such:

    httpServerThread_ = std::thread([this, port, logger](){
        using namespace std::chrono;
        auto maxThreads = std::thread::hardware_concurrency() - 1;
        auto restThreads = maxThreads > 1 ? maxThreads : 1;
        auto settings = ServerSettings(restThreads);
        settings.logger(logger);
        settings.port(port);
        settings.protocol(restinio::asio_ns::ip::tcp::v6());
        settings.request_handler(this->createRestRouter());
        // time limits                                      // ~ 0.8 month
        std::chrono::milliseconds timeout_request(std::numeric_limits<int>::max());
        settings.read_next_http_message_timelimit(timeout_request);
        settings.write_http_response_timelimit(60s);
        settings.handle_request_timeout(timeout_request);
        // socket options
        settings.socket_options_setter([](auto & options){
            options.set_option(asio::ip::tcp::no_delay{true});
        });
        httpServer_ = restinio::http_server_t<RestRouterTraits>(
            restinio::own_io_context(), // requirement: each thread has its own
            std::forward<ServerSettings>(settings)
        );
        restinio::asio_ns::post(httpServer_.io_context(), [&]{
            httpServer_.open_sync();
        });
        try {
            httpServer_.io_context().run();
        }
        catch(const std::exception &ex){
            std::cerr << "Error starting RESTinio: " << ex.what() << std::endl;
        }
    });
eao197 commented 5 years ago

There are several problems with your code.

RESTinio won't be run on a thread pool. RESTinio will work on just one thread -- your httpServerThread_. Thread count in your ServerSettings instance will be ignored. It is because RESTinio's http_server doesn't create worker threads. It is done in restinio::run() function. When you call restinio::run(restinio::on_thread_pool<Traits>(MaxThreads)...) then the thread pool is created by run(), not by http_server.

It is dangerous to initialize member variable httpServer inside another thread. It means that thread, where your DhtProxyServer is created, will have wrong content for httpServer for some time. And because the update of httpServer is not an atomic operation there is a probability of invocation of some httpServer's method while httpServer_ is not fully updated.

And it seems that most of your preinitialization code can be moved to helper function. Something like:

class DhtProxyServer {
   static ServerSettings makeHttpServerSettings(
      DhtProxyServer * self, in_port_t port, logger_t * logger)
   {
        using namespace std::chrono;
        auto maxThreads = std::thread::hardware_concurrency() - 1;
        auto restThreads = maxThreads > 1 ? maxThreads : 1;
        auto settings = ServerSettings(restThreads);
        settings.logger(logger);
        settings.port(port);
        settings.protocol(restinio::asio_ns::ip::tcp::v6());
        settings.request_handler(self->createRestRouter());
        // time limits                                      // ~ 0.8 month
        std::chrono::milliseconds timeout_request(std::numeric_limits<int>::max());
        settings.read_next_http_message_timelimit(timeout_request);
        settings.write_http_response_timelimit(60s);
        settings.handle_request_timeout(timeout_request);
        // socket options
        settings.socket_options_setter([](auto & options){
            options.set_option(asio::ip::tcp::no_delay{true});
        });
        return settings;
   }
...
public:
   DhtProxyServer(...) : ...
      , httpServer_{ restinio::use_own_io_context(), makeHttpServerSettings(this, port, logger) }
      , ...
   {
      httpServerThread_ = std::thread{ [this] {...} };
   }
   ...
};
binarytrails commented 5 years ago

@eao197

Okay, this would be going off the topic of the listener callback to which I will come back right after (it works but it does not detect when the user closes the connection manually outside of typical negotiation). I'll make a simple example to reproduce it asap.

I did it this way because I needed to be able to stop the io_context cancelling all of the operations of the httpServer_ and since I defined the ServerSettings as run_thread_pool_settings_t I thought it would make sense that this type won't ignore its own constructor arguments of the number of threads to run on.

struct RestRouterTraits : public restinio::default_traits_t
{
    using timer_manager_t = restinio::asio_timer_manager_t;
    using http_methods_mapper_t = http::custom_http_methods_t;
    using logger_t = opendht_logger_t;
    using request_handler_t = RestRouter;
    using connection_state_listener_t = http::ConnectionListener;
};
using ServerSettings = restinio::run_on_thread_pool_settings_t<RestRouterTraits>;
...
auto settings = ServerSettings(restThreads);

However, I understand what you mean by restinio::run methods being the orchestrator of the thread pool. I'll look more into the code.

For the httpServer_ singleton, I changed my code to wrap it in a unique_ptr!

Thank you for your time and suggestions! Cheers,

eao197 commented 5 years ago

(it works but it does not detect when the user closes the connection manually outside of typical negotiation)

The problem can be seen in the following scenario:

#include <restinio/all.hpp>

struct my_connection_listener
{
    void
    state_changed(const restinio::connection_state::notice_t & notice) noexcept
    {
        switch(notice.cause())
        {
        case restinio::connection_state::cause_t::accepted:
            std::cout << notice.connection_id() << " -- accepted" << std::endl;
        break;

        case restinio::connection_state::cause_t::closed:
            std::cout << notice.connection_id() << " -- closed" << std::endl;
        break;

        default: ;
        }
    }
};

int main()
{
    struct my_traits : public restinio::default_single_thread_traits_t
    {
        using logger_t = restinio::single_threaded_ostream_logger_t;
        using connection_state_listener_t = my_connection_listener;
    };

    std::vector< restinio::request_handle_t > requests;

    restinio::run(
        restinio::on_this_thread< my_traits >()
            .port(8080)
            .address("localhost")
            .handle_request_timeout(std::chrono::hours{24})
            .connection_state_listener(
                std::make_unique<my_connection_listener>())
//          .max_pipelined_requests(4)
            .cleanup_func([&] {
                    requests.clear();
                })
            .request_handler([&](auto req) {
                requests.push_back(req);
                return restinio::request_accepted();
            }));

    return 0;
}

If you compile and run this app and then start curl -v http://localhost:8080 and break curl by Ctrl+C then the app doesn't detect that the connection is lost.

It is becase max_pipelined_requests is 1 by default. It means that when RESTinio reads the request it stops reading from that connection until the response will be produced. It means that RESTinio doesn't control the corresponding socket while the request is being processed. And because of that RESTinio doesn't know about disconnection.

If max_pipelined_requests is greater than 1 then RESTinio will continue to read from the socket after parsing the first request. In that case, RESTinio can detect the disconnection and calls state listener as expected.

You can see that when if uncomment the line with max_pipelined_requests call.

I still don't know is it acceptable behavior of RESTinio in presence of state listener or not... :(

binarytrails commented 5 years ago

Exactly, I just arrived to the same conclusion with one of your examples modified below. Basically, if the request.done() method is not called or an internal callback trying to reuse the same request with a closed connection then, it will not detect it. I completely agree it is a further design choice but your last example is perfectly clear and I would say that this settles my wondering.

#include <iostream>

#include <restinio/all.hpp>

// Class for connection state listener.
class connection_listener_t
{
    // Because method of connection_listener_t can be called from
    // different threads at the same time we have to protect connection
    // listener.
    std::mutex m_lock;

    static const char *
    cause_to_str( restinio::connection_state::cause_t cause ) noexcept
    {
        const char * result = "unknown";
        switch( cause )
        {
            case restinio::connection_state::cause_t::accepted:
                result = "accepted";
            break;

            case restinio::connection_state::cause_t::closed:
                result = "closed";
            break;

            case restinio::connection_state::cause_t::upgraded_to_websocket:
                result = "upgraded";
            break;
        }
        return result;
    }

public:
    // This method will be called by RESTinio.
    void state_changed(
        const restinio::connection_state::notice_t & notice ) noexcept
    {
        std::lock_guard<std::mutex> l{ m_lock };

        fmt::print( "connection-notice: {} (from {}) => {}\n",
                notice.connection_id(),
                notice.remote_endpoint(),
                cause_to_str( notice.cause() ) );
    }
};

// This is the request handler.
restinio::request_handling_status_t handler(
        restinio::asio_ns::io_context & ioctx,
        restinio::request_handle_t req )
{
    if( restinio::http_method_get() == req->header().method() &&
        req->header().request_target() == "/" )
    {
        auto timer = std::make_shared<restinio::asio_ns::steady_timer>( ioctx );
        timer->expires_after( std::chrono::milliseconds(10000) );
        timer->async_wait( [timer, req](const auto & ec) {
            if( !ec ) {
                req->create_response()
                    .append_header( restinio::http_field::server, "RESTinio hello world server" )
                    .append_header_date_field()
                    .append_header( restinio::http_field::content_type, "text/plain; charset=utf-8" )
                    .set_body( fmt::format( "{} (from {}): Hello world!",
                            req->connection_id(),
                            req->remote_endpoint() ) )
                    .done(); //  FIXME if your remove .done() the conn will never be closed
            }
        });
        return restinio::request_accepted();
    }

    return restinio::request_rejected();
}

int main()
{
    struct my_traits_t : public restinio::default_traits_t
    {
        // Change the default connection state listener to our one.
        using connection_state_listener_t = connection_listener_t;
    };

    try
    {
        restinio::asio_ns::io_context ioctx;
        restinio::run(
            ioctx,
            restinio::on_thread_pool< my_traits_t >( std::thread::hardware_concurrency() )
                .port( 8080 )
                .address( "localhost" )
                // Connection listener must be set manually.
                .connection_state_listener(
                        std::make_shared< connection_listener_t >() )
                .request_handler([&](auto req){
                    return handler(ioctx, std::move(req));
                })
        );
    }
    catch( const std::exception & ex )
    {
        std::cerr << "Error: " << ex.what() << std::endl;
        return 1;
    }

    return 0;
}

Thank you for this feature. Seva