Stiffstream / restinio

Cross-platform, efficient, customizable, and robust asynchronous HTTP(S)/WebSocket server C++ library with the right balance between performance and ease of use
Other
1.15k stars 93 forks source link

user_controlled_output_t closes the connection on .flush() #22

Closed binarytrails closed 5 years ago

binarytrails commented 5 years ago

Hi Stiffstream,

I'm trying to implement a user_controlled_output_t as a response inside my get handler. However, the connection is getting closed on a flush() as seen in this simple:

request_status Server::get(restinio::request_handle_t request,
                         restinio::router::route_params_t params)
{
    printf("connection_id: %lu\n", request->connection_id());

    using output_t = restinio::user_controlled_output_t;
    auto response = this->init_http_resp(request->create_response<output_t>(

    ));
    //response.set_content_length(request->body().size());
    response.flush();
    std::this_thread::sleep_for(std::chrono::seconds(2));

    std::string part1 = "i like";
    response.set_body(part1);
    response.set_content_length(part1.size());
    response.flush();
    std::this_thread::sleep_for(std::chrono::seconds(2));

    std::string part2 = " waffles!";
    response.set_body(part2);
    response.set_content_length(part2.size());
    response.flush();
    std::this_thread::sleep_for(std::chrono::seconds(2));

    return response.done();
}

I'm implementing your amazing RESTinio server into OpenDHT project that needs to be able to send get responses by parts i.e. head first then, once a callback is fired for a value found on a node, another one and so forth until done callback.

I also went into debugging your code base with gdb and the flags are properly set:

gdb$ b /usr/local/include/restinio/impl/connection.hpp:794
Breakpoint 1 at 0xb10b1: file /usr/local/include/restinio/impl/connection.hpp, line 794.
...
gdb$ p response_output_flags
$1 = {m_response_parts = restinio::response_parts_attr_t::not_final_parts, m_response_connection = restinio::response_connection_attr_t::connection_keepalive}

But it seems that the connection is getting closed on a flush() which is strange to me because the definition of a flush would be considered a different one from done() in terms of not closing the connection with response_parts_attr_t::not_finals_parts whereas a done() is sending response_parts_attr_t::final_parts flag.

The goal is to be able to perform an async yield like this part that I am removing.

Thank you for your time!

Sincerely, Seva

eao197 commented 5 years ago

Hi!

Thanks for reporting this. We'll investigate this issue ASAP, but there are holidays in our country from 5 to 9 of May, and there are some other active issues in the progress. Sorry for the inconvenience.

eao197 commented 5 years ago

Hi!

I don't think there is a problem with RESTinio. The connection is closed by a client after receiving the first part of the response. I suppose this is because of "0" as a value for Content-Length header:

$ curl -v http://localhost:8080/
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.47.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Connection: keep-alive
< Content-Length: 0
< Server: RESTinio
< Access-Control-Allow-Origin: *
< 
* Connection #0 to host localhost left intact

But if you change Server::get in your example that way:

request_status Server::get(restinio::request_handle_t request,
                         restinio::router::route_params_t params)
{
    printf("connection_id: %lu\n", request->connection_id());

    std::string part1 = "i like";
    std::string part2 = " waffles!\r\n";

    using output_t = restinio::user_controlled_output_t;
    auto response = this->init_http_resp(request->create_response<output_t>(

    ));
    response.set_content_length(part1.size() + part2.size());
    response.flush();
    std::this_thread::sleep_for(std::chrono::seconds(2));

    response.set_body(part1);
    response.flush();
    std::this_thread::sleep_for(std::chrono::seconds(2));

    response.set_body(part2);
    response.flush();
    std::this_thread::sleep_for(std::chrono::seconds(2));

    return response.done();
}

You'll get an expected result:

$ curl -v http://localhost:8080/
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.47.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Connection: keep-alive
< Content-Length: 17
< Server: RESTinio
< Access-Control-Allow-Origin: *
< 
i like waffles!
* Connection #0 to host localhost left intact
binarytrails commented 5 years ago

Thank you for the prompt response! I understand how to use this type of controlled output.

If I wanted to remove the Content-Length field from the header and just send the data with an open connection and then close it, would it mean that I have to patch Content-Length header field which is a special case according to http_header_field_t definiton?

I tried removing it with response.header().remove_field("Content-Length"); but without success;

On a DHT, one does not know the total size of values found for a certain hash prior to their callbacks (one per value) until the done callback when I'm planning to close the connection.

Cheers, Seva

eao197 commented 5 years ago

Maybe chunked output is more appropriate in your case?

binarytrails commented 5 years ago

Yes, that could definitely work. However, I faced a second issue with this in terms of the flush happening after the connection was closed due to the async write that happens later than the done callback is fired thus trying to write after the context of the get handler method has returned the request handling status:

Running on restinio on 3 threads
[2019-05-10 11:09:15.447] TRACE: starting server on 0.0.0.0:8080
[2019-05-10 11:09:15.447]  INFO: init accept #0
[2019-05-10 11:09:15.447]  INFO: server started on 0.0.0.0:8080
[2019-05-10 11:09:24.930] TRACE: accept connection from 127.0.0.1:46216 on socket #0
[2019-05-10 11:09:24.930] TRACE: [connection:1] start connection with 127.0.0.1:46216
[2019-05-10 11:09:24.930] TRACE: [connection:1] start waiting for request
[2019-05-10 11:09:24.930] TRACE: [connection:1] continue reading request
[2019-05-10 11:09:24.930] TRACE: [connection:1] received 139 bytes
[2019-05-10 11:09:24.930] TRACE: [connection:1] request received (#0): GET /wowo
Connection Id: 1
[2019-05-10 11:09:24.930] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 1
[2019-05-10 11:09:24.930] TRACE: [connection:1] start next write group for response (#0), size: 1
[2019-05-10 11:09:24.930] TRACE: [connection:1] start response (#0): HTTP/1.1 200 OK
[2019-05-10 11:09:24.930] TRACE: [connection:1] sending resp data, buf count: 1, total size: 153
Done lock aquired! tosend=0 flushed=0
ValuesToSend=1
ValuesToSend=2
Sending response! tosend=2 flushed=0
[2019-05-10 11:09:28.206] TRACE: [connection:1] append response (#0), flags: { final_parts, connection_keepalive }, write group size: 1
[2019-05-10 11:09:28.206] TRACE: [connection:1] outgoing data was sent: 153 bytes
[2019-05-10 11:09:28.206] TRACE: [connection:1] finishing current write group
[2019-05-10 11:09:28.206] TRACE: [connection:1] should keep alive
[2019-05-10 11:09:28.206] TRACE: [connection:1] start next write group for response (#0), size: 1
[2019-05-10 11:09:28.206] TRACE: [connection:1] sending resp data, buf count: 1, total size: 5
[2019-05-10 11:09:28.206] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 3
[2019-05-10 11:09:28.206] ERROR: [connection:1] unable to handle response: no context associated with request 0
[2019-05-10 11:09:28.206] TRACE: [connection:1] close
[2019-05-10 11:09:28.206] TRACE: [connection:1] close: close socket
[2019-05-10 11:09:28.206] TRACE: [connection:1] close: timer canceled
[2019-05-10 11:09:28.206] TRACE: [connection:1] close: reset responses data
[2019-05-10 11:09:28.206]  WARN: [connection:1] try to write response, while socket is closed
terminate called after throwing an instance of 'std::error_code'
Aborted (core dumped)

In this example I throw asio::error_code if not 0 from the flush completion handler.

I was wondering, is there a way to leave the connection alone as long as there is data scheduled to be flushed using the chunk output_t? I tried making a lock to wait for the values to be flushed using their callbacks but then it seems that I'm blocking the async write operations from happening.

Thank you for your time! :)

eao197 commented 5 years ago

I'm afraid I can't find time to investigate that case today. I'll return to it tomorrow.

Is it possible that the client close connection on its side between 11:09:24.930 and 11:09:28.206 (timestamps from your log)?

binarytrails commented 5 years ago

@eao197 thank you for helping me to find an elegant solution!

Basically, here is my method:

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    // dht done prerequisites
    bool done_action = false;
    std::mutex done_mutex;
    std::condition_variable done_cv;

    using output_t = restinio::chunked_output_t;
    auto response = this->initHttpResponse(request->create_response<output_t>());
    response.flush();

    // callback per value
    this->dhtNode->get(infoHash, [this, &response] (const dht::Sp<dht::Value>& value){
        auto output = Json::writeString(this->jsonBuilder, value->toJson()) + "\n";
        response.append_chunk(output);
        // async completion handler
        response.flush([](const asio::error_code & ec){
            if (ec.value() != 0)
                throw ec;
        });
        return true;
    },
    // callback after all values
    [&] (bool /*ok*/){
        done_action = true;
        done_cv.notify_one();
    });
    std::unique_lock<std::mutex> done_lock(done_mutex);
    done_cv.wait_for(done_lock, std::chrono::seconds(10), [&]{return done_action;});

    return restinio::request_handling_status_t::accepted;
}

I thought of returning accepted status instead of returning it from response.done() method call to avoid closing the connection but then, I still have to figure out how to detect that all values have been flushed which happens with a slight delay (longer than the return from a done callback).

Perhaps, I'm missing something with not blocking the async write operation of the flush by using my lock inside of the get router handler method?

Also, I tried passing values by reference to the nested asio flush callback but then by waiting for them, I'm again, blocking their processing and end up with referencing to values out of the method scope.

Cheers, Seva

eao197 commented 5 years ago

I think it's better to use a different approach here. You don't need a thread pool for processing incoming requests. Just one worker thread for RESTinio can be enough. But this thread shouldn't do the actual request processing. It should only accept a new request and delegate its processing to another thread. The response for an accepted request can be formed on a different context.

An example of this approach can be seen in a standard RESTinio example here. It uses just one RESTinio thread for accepting of incoming requests and one worker thread for producing responses. There is some of SObjectizer's specific (CSP-channels in form of SObjectizer's mchains), but you can think about mchains as of thread-safe message queues.

eao197 commented 5 years ago

Here is a modified example of async processing of an incoming request with chunked encoding.

Processing of a new request is delegated to another thread. This thread sends the first part of the response immediately and other parts after a random pause.

binarytrails commented 5 years ago

@eao197 thank you so much for your examples and insights! i will have to take a little time to adapt this solution to our needs and get back to you; in terms of usage, the aim is to use RESTinio as a very high load proxy server and client over a distributed hash tables network exposing it to the web services; thats why the previous proxy which i'm removing is async over a maximal thread pool which ensures it can handle these types of loads in parallel. as i'm getting familiar with your code base, i must say that it is very well written! cheers,,

binarytrails commented 5 years ago

@eao197 As I'm implementing the producer thread to flush the responses, I have two questions:

  1. In a case of a RESTful design, one would bind each resource handling to a specific function like in the code below. However, I'm confused on a way to dispatch with the so_5::select and the nested so_5::case_s in your example under processing_thread_func which gets all of the requests whereas, we absolutely have to isolate each get,put,etc. operation in a specific method like at this moment with our OOP modular design to simplify complex operations by segmenting them into methods, parts.

    template <typename HttpResponse>
    std::unique_ptr<RestRouter> DhtProxyServer::createRestRouter()
    {
    using namespace std::placeholders;
    auto router = std::make_unique<RestRouter>();
    router->add_handler(restinio::http_method_t::http_options,
                        "/:hash", std::bind(&DhtProxyServer::options, this, _1, _2));
    router->http_get("/", std::bind(&DhtProxyServer::getNodeInfo, this, _1, _2));
    router->http_get("/:hash", std::bind(&DhtProxyServer::get, this, _1, _2));
    router->http_put("/:hash", std::bind(&DhtProxyServer::put, this, _1, _2));
    return router;
    }
    ...
    settings.request_handler(this->createRestRouter());
    ...
    restinio::run(std::move(settings));
  2. Is there any particular reason for using so_5::send_delayed instead of so_5::send? If I understand correctly, you used it to demonstrate a delayed response that I had in a callback of a DHT so it should be fine that I use the send without a delay and I assume that the SObject along with the consumer thread will be able to queue the generated responses to send.

Cheers, Seva

eao197 commented 5 years ago

Hi!

My example just demonstrates a method of delegating the actual request processing to another worker thread. There is no need to use SObjectizer, you can use any thread-safe event queue. But if you want to use SObjectizer then some explanation to your points.

1) It is not a problem to send different messages to req_ch and handle them inside select. For a very simple example it can look like:

struct GetNodeInfo {
   DhtProxyServer * server_;
   restinio::request_handle_t req_;
   restinio::router::route_params_t params_;
};
struct Gut {
   DhtProxyServer * server_;
   restinio::request_handle_t req_;
   restinio::router::route_params_t params_;
};
struct Put {
   DhtProxyServer * server_;
   restinio::request_handle_t req_;
   restinio::router::route_params_t params_;
};
... // And so on.
...
template <typename HttpResponse>
std::unique_ptr<RestRouter> DhtProxyServer::createRestRouter(so_5::mchain_t reqChain)
{
    using namespace std::placeholders;
    auto router = std::make_unique<RestRouter>();
    ...
    router->http_get("/",[this, reqChain](auto req, auto params) {
            so_5::send<so_5::mutable_msg<GetNodeInfo>>(reqChain, this, std::move(req), std::move(params));
        });
    router->http_get("/:hash", [this, reqChain](auto req, auto params) {
            so_5::send<so_5::mutable_msg<Gut>>(reqChain, this, std::move(req), std::move(params));
        });
    router->http_put("/:hash", [this, reqChain](auto req, auto params) {
            so_5::send<so_5::mutable_msg<Put>>(reqChain, this, std::move(req), std::move(params));
        });
    return router;
}
...
void processingThreadFunc(so_5::mchain_t reqChain) {
    select(from_all(),
        case_(
            [](so_5::mutable_mhood_t<GetNodeInfo> cmd) {
                cmd->server_->getNodeInfo(std::move(cmd->req_), std::move(cmd->params_));
            },
            [](so_5::mutable_mhood_t<Get> cmd) {
                cmd->server_->get(std::move(cmd->req_), std::move(cmd->params_));
            },
            [](so_5::mutable_mhood_t<Put> cmd) {
                cmd->server_->put(std::move(cmd->req_), std::move(cmd->params_));
            },
            ...),
       case_(...),
       case_(...));
}

Or another scheme can be used:

struct HandleRequest {
    std::function<void()> handler_;
};
...
template <typename HttpResponse>
std::unique_ptr<RestRouter> DhtProxyServer::createRestRouter(so_5::mchain_t reqChain)
{
    using namespace std::placeholders;
    auto router = std::make_unique<RestRouter>();
    ...
    router->http_get("/", [this, reqChain](auto a_req, auto a_params) {
            so_5::send<HandleRequest>(reqChain, [this, req = std::move(a_req), params = std::move(a_params)] {
                    this->getNodeInfo(std::move(req), std::move(params));
                });
        });
    router->http_get("/:hash", [this, reqChain](auto a_req, auto a_params) {
            so_5::send<HandleRequest>(reqChain, [this, req = std::move(a_req), params = std::move(a_params)] {
                    this->get(std::move(req), std::move(params));
                });
        });
    router->http_put("/:hash", [this, reqChain](auto a_req, auto a_params) {
            so_5::send<HandleRequest>(reqChain, [this, req = std::move(a_req), params = std::move(a_params)] {
                    this->put(std::move(req), std::move(params));
                });
        });
    return router;
}
...
void processingThreadFunc(so_5::mchain_t reqChain) {
    select(from_all(),
        case_(
            [](so_5::mhood_t<HandleRequest> cmd) {
                cmd->handler_();
            }),
       case_(...),
       ...);
}

BTW, SObjectizer's mchain can be read from different threads at the same time. It means that you can create a pool of worker thread and every thread from that pool can run processingThreadFunc shown above. In that case, you'll have a very simple load-balancing.

2) You're right, send_delayed is used just for delaying the processing of a request. You don't need to use send_delayed in your code.

binarytrails commented 5 years ago

@eao197 Just to be sure I understand correctly the response in relation to my original request of simply having a flush happen even if an async processing method scope is over:

  1. In your gist restinio_github_issue_22_2.cpp, you use a flush and then another mchain to handle the delayed request by this you mean that in an mchain channel the connection will stay open even if the method scope is over and by using the so_5::send commands you are avoiding blocking the background flush processing? I'm asking this because a flush happens before a send which means in an original channel.

  2. In your last example you wrap the rest methods with structs to provide handler contexts then, you subtype them of mutable_mhood_t to send them throught a channel chosen with a select which would avoid a connection being closed even if the scope ends but a flush is scheduled to happen?

Out of curiosity, wouldn't be more simple to wrap the flush context of a chunked output type in RESTinio to make sure a connection is not closed if things are scheduled to be flushed to allow data to be send even if the REST handler put,get,etc scope are over?

eao197 commented 5 years ago

I'm asking this because a flush happens before a send which means in an original channel.

Let me explain the threading model in that example (I mean restinio_github_issue_22_2.cpp gist).

There are two thread: M (the main thread) and P (the processing thread). The RESTinio is run on thread M. There RESTinio handles all I/O related events (accepting connections, reading the data and writing the data back to TCP socket). And all request_handlers are called on thread M too.

It means that if some event handler starts some long-lasted operation it will block thread M and RESTinio can't do anything until the event handler completes.

That is why the request is sent to thread P via so_5::send() and mchain.

Thread P reads new messages from req_ch and delayed_ch and handles them in a loop. It means that if thread P will do some blocking call during the processing of a request it won't block thread M.

When thread P calls flush the corresponding I/O operations are not performed on the context of P. Instead the RESTinio is notified about ready to send part of the response. RESTinio schedules I/O operation and that operation will be performed on context of thread M.

So when thread P calls flush and then calls so_5::send to resend the next task to itself it doesn't mean that the part of response is already written to TCP socket. The call to flush just schedules I/O operation that will be performed on thread M. But you don't know when it happens. It can be started and completed just before the return from flush. Or it can be started several milliseconds later. It's multithreading...

BTW, if you can read Russian there is a new article on Harb with some explanations on this topic.

eao197 commented 5 years ago

In your last example you wrap the rest methods with structs to provide handler contexts then, you subtype them of mutable_mhood_t to send them throught a channel chosen with a select which would avoid a connection being closed even if the scope ends but a flush is scheduled to happen?

This example just shows how information about a particular request can be delivered to a separate worker thread. And there is a lot of SObjectizer's specific:

You can think about mchains as thread safe event queues. so_5::send<Msg>(mchain,args...) means something like that:

auto msg = std::make_unique<Msg>(args...);
mchain.enqueue(std::move(msg));

Mchain just allow to pass a message instance from one thread to another.

So if you have your own thread-safe message queue you can do the same trick:

binarytrails commented 5 years ago

@eao197 Thank you for the detailed explanations and the article, I will read it!

binarytrails commented 5 years ago

@eao197 I implemented a dispatch queue in a thread safe environment but yet, the restinio express router would not send any data until there is a return from the routed request which I think has to be solved in terms of not waiting for a request's return internally and if I don't block the get request until the done_action flag is set then, the return of the handling status closes the connection right away. I think there may be need for a patch to allow routing to functions without a connection handling on a return basis because I really like the design on the express router but the fact, it closes the connection on a return makes a dispatch / response queue in a different thread not working even with request processing in a different thread that the message producing of responses. You can see in the output that only after a return the actual data is written to a socket:

    this->serverRespThread = std::thread(&DhtProxyServer::dispatchResponses, this);
    this->serverThread = std::thread([this, port](){
        using namespace std::chrono;
        auto maxThreads = std::thread::hardware_concurrency() - 2; //dht,resp
        auto restThreads = maxThreads > 1 ? maxThreads : 1;
        printf("Running on restinio on %i threads\n", restThreads);
        auto settings = restinio::on_thread_pool<RestRouterTraits>(restThreads);
        settings.address("0.0.0.0");
        settings.port(port);
        settings.request_handler(this->createRestRouter());
        settings.read_next_http_message_timelimit(10s);
        settings.write_http_response_timelimit(10s);
        settings.handle_request_timeout(10s);
        settings.socket_options_setter([](auto & options){
            options.set_option(asio::ip::tcp::no_delay{true});
        });
        try {
            restinio::run(std::move(settings));
        }
        catch(const std::exception &ex) {
            std::cerr << "Error: " << ex.what() << std::endl;
        }
    });
...
void DhtProxyServer::scheduleRespDispatch(response_func&& response)
{
    std::unique_lock<std::mutex> lock(serverRespLock_);
    printf("adding request to execute\n");
    serverRespQueue_.push(std::move(response));
    lock.unlock();
    serverRespCv_.notify_all();
}

void DhtProxyServer::dispatchResponses()
{
    std::unique_lock<std::mutex> lock(serverRespLock_);
    do {
        // wait for a response
        serverRespCv_.wait(lock, [this]{
            return (serverRespQueue_.size() || stopServer_);
        });
        printf("dispatchResponses woken up, has %lu resps\n", serverRespQueue_.size());
        // with the lock
        if (!stopServer_ && serverRespQueue_.size()){
            auto resp_op = std::move(serverRespQueue_.front());
            serverRespQueue_.pop();
            lock.unlock();
            // execute the request
            printf("executing the request\n");
            resp_op();
            lock.lock();
        }
    } while (!stopServer_);
}
...
request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    // dht done prerequisites
    bool done_action = false;
    std::mutex done_mutex;
    std::condition_variable done_cv;

    auto response = this->initHttpResponse(request->create_response<response_t>());
    this->scheduleRespDispatch([&]{
        response.flush();
    });

    // callback per value
    this->dhtNode->get(infoHash, [this, &response] (const dht::Sp<dht::Value>& value){
        auto output = Json::writeString(this->jsonBuilder, value->toJson()) + "\n";
        response.append_chunk(output);
        this->scheduleRespDispatch([&]{
            response.flush();
        });
        return true;
    },
    [&] (bool /*ok*/){
    // callback after all values
        done_cv.notify_one();
        this->scheduleRespDispatch([&]{
            response.done();
            done_action = true;
        });
    });
    std::unique_lock<std::mutex> done_lock(done_mutex);
    done_cv.wait_for(done_lock, std::chrono::seconds(10), [&]{return done_action;});

    printf("finishing get function done=%i\n", done_action);
    return restinio::request_handling_status_t::accepted;
}

And the output demonstrating the blocking from a different processing thread until the return:

[2019-05-15 12:09:08.898] TRACE: [connection:1] continue reading request
[2019-05-15 12:09:08.898] TRACE: [connection:1] received 139 bytes
[2019-05-15 12:09:08.899] TRACE: [connection:1] request received (#0): GET /wowo
Connection Id: 1
adding request to execute
dispatchResponses woken up, has 1 resps
executing the request
adding request to execute
dispatchResponses woken up, has 1 resps
executing the request
adding request to execute
dispatchResponses woken up, has 1 resps
executing the request
adding request to execute
dispatchResponses woken up, has 1 resps
executing the request
finishing get function done=1
[2019-05-15 12:09:18.899] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 1
[2019-05-15 12:09:18.899] TRACE: [connection:1] start next write group for response (#0), size: 1
[2019-05-15 12:09:18.900] TRACE: [connection:1] start response (#0): HTTP/1.1 200 OK
[2019-05-15 12:09:18.900] TRACE: [connection:1] sending resp data, buf count: 1, total size: 153
[2019-05-15 12:09:18.900] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 3
[2019-05-15 12:09:18.900] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 3
[2019-05-15 12:09:18.900] TRACE: [connection:1] append response (#0), flags: { final_parts, connection_keepalive }, write group size: 1
[2019-05-15 12:09:18.901] TRACE: [connection:1] outgoing data was sent: 153 bytes
[2019-05-15 12:09:18.901] TRACE: [connection:1] finishing current write group
[2019-05-15 12:09:18.901] TRACE: [connection:1] should keep alive
[2019-05-15 12:09:18.901] TRACE: [connection:1] start next write group for response (#0), size: 7
[2019-05-15 12:09:18.901] TRACE: [connection:1] sending resp data, buf count: 7, total size: 130
[2019-05-15 12:09:18.901] TRACE: [connection:1] outgoing data was sent: 130 bytes
[2019-05-15 12:09:18.901] TRACE: [connection:1] finishing current write group
[2019-05-15 12:09:18.901] TRACE: [connection:1] should keep alive
[2019-05-15 12:09:18.901] TRACE: [connection:1] start waiting for request
[2019-05-15 12:09:18.901] TRACE: [connection:1] continue reading request
[2019-05-15 12:09:18.910] TRACE: [connection:1] EOF and no request, close connection
[2019-05-15 12:09:18.910] TRACE: [connection:1] close
[2019-05-15 12:09:18.910] TRACE: [connection:1] close: close socket
[2019-05-15 12:09:18.910] TRACE: [connection:1] close: timer canceled
[2019-05-15 12:09:18.910] TRACE: [connection:1] close: reset responses data
[2019-05-15 12:09:18.910] TRACE: [connection:1] destructor called
eao197 commented 5 years ago

You didn't change your main working scheme: you block the RESTinio's thread inside get-handler. You should move almost all your code from get() method on a separate thread. Something like:

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    this->scheduleRespDispatch([this, infoHash, request] {
        auto response = this->initHttpResponse(request->create_response<response_t>());
        response.flush();

        // callback per value
        this->dhtNode->get(infoHash, [this, &response] (const dht::Sp<dht::Value>& value){
            auto output = Json::writeString(this->jsonBuilder, value->toJson()) + "\n";
            response.append_chunk(output);
            response.flush();
            return true;
        },
        [&] (bool /*ok*/){
            response.done();
        });
    });
    return restinio::request_handling_status_t::accepted;
}

In that case I need just one thread for RESTinio, but thread pool it necessary for calling this->dhtNode->get() somewhere.

binarytrails commented 5 years ago

@eao197 yes, I tried that before but I was getting segfaults with the exact same scenario so I decided to decouple the requests to send as soon as one available it send it which would make more sense because there may be tons of values on a certain node but following your example I get as previously:

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    this->scheduleRespDispatch([this, infoHash, request]{
        auto response = this->initHttpResponse(request->create_response<response_t>());
        response.flush();
        this->dhtNode->get(infoHash, [this, &response] (const dht::Sp<dht::Value>& value){
            auto output = Json::writeString(this->jsonBuilder, value->toJson()) + "\n";
            response.append_chunk(output);
            response.flush();
            return true;
        },
        [&] (bool /*ok*/){
            response.done();
        });
    });
    printf("finishing get function \n");
    return restinio::request_handling_status_t::accepted;
}
[2019-05-15 13:09:00.885]  INFO: init accept #0
[2019-05-15 13:09:00.885]  INFO: server started on 0.0.0.0:8080
[2019-05-15 13:09:02.292] TRACE: accept connection from 127.0.0.1:34722 on socket #0
[2019-05-15 13:09:02.292] TRACE: [connection:1] start connection with 127.0.0.1:34722
[2019-05-15 13:09:02.292] TRACE: [connection:1] start waiting for request
[2019-05-15 13:09:02.292] TRACE: [connection:1] continue reading request
[2019-05-15 13:09:02.292] TRACE: [connection:1] received 139 bytes
[2019-05-15 13:09:02.292] TRACE: [connection:1] request received (#0): GET /wowo
Connection Id: 1
adding request to execute
finishing get function 
dispatchResponses woken up, has 1 resps
executing the request
[2019-05-15 13:09:02.292] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 1
[2019-05-15 13:09:02.292] TRACE: [connection:1] start next write group for response (#0), size: 1
[2019-05-15 13:09:02.292] TRACE: [connection:1] start response (#0): HTTP/1.1 200 OK
[2019-05-15 13:09:02.292] TRACE: [connection:1] sending resp data, buf count: 1, total size: 153
[2019-05-15 13:09:02.292] TRACE: [connection:1] outgoing data was sent: 153 bytes
[2019-05-15 13:09:02.292] TRACE: [connection:1] finishing current write group
[2019-05-15 13:09:02.292] TRACE: [connection:1] should keep alive
[2019-05-15 13:09:02.292] TRACE: [connection:1] destructor called

Thread 3 "proxy_restinio." received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff5e73700 (LWP 8681)]
-----------------------------------------------------------------------------------------------------------------------[regs]
  RAX: 0x0000555555597418  RBX: 0x0000555555597418  RBP: 0x00007FFFF5E6F5F0  RSP: 0x00007FFFF5E6F5E0  o d I t s z a p c 
  RDI: 0x0000555555597418  RSI: 0x00007FFFF5E6F770  RDX: 0x0000000000000000  RCX: 0x0000555555597418  RIP: 0x0000555555590020
  R8 : 0x0000000000000002  R9 : 0x0000000000000001  R10: 0x00007FFFF00008D0  R11: 0x0000000000000000  R12: 0x0000555555597418
  R13: 0x00007FFFF5E6F770  R14: 0x00007FFFF5E6F800  R15: 0x00007FFFF5E6F860
  CS: 0033  DS: 0000  ES: 0000  FS: 0000  GS: 0000  SS: 002B                
-----------------------------------------------------------------------------------------------------------------------[code]
=> 0x555555590020 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+26>: mov    DWORD PTR [rax],edx
   0x555555590022 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+28>: mov    rax,QWORD PTR [rbp-0x10]
   0x555555590026 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+32>: mov    rdi,rax
   0x555555590029 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+35>: call   0x555555590184 <restinio::writable_item_t::get_writable_base()>
   0x55555559002e <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+40>: mov    rdx,QWORD PTR [rax]
   0x555555590031 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+43>: add    rdx,0x10
   0x555555590035 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+47>: mov    rdx,QWORD PTR [rdx]
   0x555555590038 <restinio::writable_item_t::writable_item_t(restinio::writable_item_t&&)+50>: mov    rcx,QWORD PTR [rbp-0x8]
-----------------------------------------------------------------------------------------------------------------------------
0x0000555555590020 in restinio::writable_item_t::writable_item_t (this=0x555555597418 <std::__shared_ptr<restinio::request_t, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr()+28>, b=...) at /usr/local/include/restinio/buffers.hpp:522
522             :   m_write_type{ b.m_write_type }
gdb$ bt
#0  0x0000555555590020 in restinio::writable_item_t::writable_item_t (this=0x555555597418 <std::__shared_ptr<restinio::request_t, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr()+28>, b=...) at /usr/local/include/restinio/buffers.hpp:522
#1  0x00005555555afba1 in __gnu_cxx::new_allocator<restinio::writable_item_t>::construct<restinio::writable_item_t, restinio::writable_item_t> (this=0x7fffee7f92a0, __p=0x555555597418 <std::__shared_ptr<restinio::request_t, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr()+28>, __args#0=...) at /usr/include/c++/8.2.1/ext/new_allocator.h:136
#2  0x00005555555a5ee8 in std::allocator_traits<std::allocator<restinio::writable_item_t> >::construct<restinio::writable_item_t, restinio::writable_item_t> (__a=..., __p=0x555555597418 <std::__shared_ptr<restinio::request_t, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr()+28>, __args#0=...) at /usr/include/c++/8.2.1/bits/alloc_traits.h:475
#3  0x000055555559c140 in std::vector<restinio::writable_item_t, std::allocator<restinio::writable_item_t> >::emplace_back<restinio::writable_item_t> (this=0x7fffee7f92a0, __args#0=...) at /usr/include/c++/8.2.1/bits/vector.tcc:103
#4  0x00005555555917c2 in restinio::response_builder_t<restinio::chunked_output_t>::append_chunk(restinio::writable_item_t) & (this=0x7fffee7f9220, chunk=...) at /usr/local/include/restinio/message_builders.hpp:674
n0t ~ $ http 127.0.0.1:8080/wowo
HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
Connection: keep-alive
Content-Type: application/json
Server: RESTinio
Transfer-Encoding: chunked

http: error: ChunkedEncodingError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
eao197 commented 5 years ago

Unfortunately, I don't know anything about your this->dhtNode->get(). Maybe it calls its callbacks on the context of another thread. In that case all references in those callbacks can be invalid. In that case you have to create an additional object that will outlive the this-dhtNode->get(). It can look like that:

struct GetContext : public std::enable_shared_from_this<GetContext> {
   DhtProxyServer * server_;
   dht::InfoHash infoHash_;
   restinio::response_builder_t<response_t> response_;

   GetContext(DhtProxyServer * server, dhtInfoHash infoHash, restinio::response_builder_t<response_t> response)
      : server{server_}, infoHash_{std::move(infoHash)}, response_{std::move(response)}
   {}
};

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    auto context = std::make_shared<GetContext>(this, std::move(infoHash), request->create_response<response_t>());

    this->scheduleRespDispatch([context]{
        context->server_->initHttpResponse(context->response_);
        context->response_.flush();
        this->dhtNode->get(context->infoHash_, [context] (const dht::Sp<dht::Value>& value){
            auto output = Json::writeString(context->server_->jsonBuilder, value->toJson()) + "\n";
            context->response_.append_chunk(output);
            context->response_.flush();
            return true;
        },
        [context] (bool /*ok*/){
            context->response_.done();
        });
    });
    printf("finishing get function \n");
    return restinio::request_handling_status_t::accepted;
}

Please note that context object is captured by value, not by a reference.

AmarOk1412 commented 5 years ago

Just to complete a bit (OpenDHT contributor here)

Callbacks for DHT operations have to be called on a single thread (aka our event loop). this->dhtNode->get() will put the callback into the pending operations and the event loop will do the callback later. So yeah, the following code will be incorrect:

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    this->scheduleRespDispatch([this, infoHash, request]{
        auto response = this->initHttpResponse(request->create_response<response_t>());
        response.flush();
        this->dhtNode->get(infoHash, [this, &response] (const dht::Sp<dht::Value>& value){
            // response can be invalid
            auto output = Json::writeString(this->jsonBuilder, value->toJson()) + "\n";
            response.append_chunk(output);
            response.flush();
            return true;
        },
        [&] (bool /*ok*/){
            response.done();
        });
       // response deleted here!
    });
    printf("finishing get function \n");
    return restinio::request_handling_status_t::accepted;
}

Because response will be invalid before the callback. you need to wrap the object in a shared_ptr or pass it by value (if the copy of the response is not a problem).

eao197 commented 5 years ago

Callbacks for DHT operations have to be called on a single thread (aka our event loop).

If RESTinio and DHT work on separate threads then there can be a simpler solution with an additional object (without additional threads and thread-safe message queues). Just:

struct GetContext : public std::enable_shared_from_this<GetContext> {
   DhtProxyServer * server_;
   dht::InfoHash infoHash_;
   restinio::response_builder_t<response_t> response_;

   GetContext(DhtProxyServer * server, dhtInfoHash infoHash, restinio::response_builder_t<response_t> response)
      : server{server_}, infoHash_{std::move(infoHash)}, response_{std::move(response)}
   {}
};

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    auto context = std::make_shared<GetContext>(this, std::move(infoHash), request->create_response<response_t>());
    context->server_->initHttpResponse(context->response_);
    context->response_.flush();
    this->dhtNode->get(context->infoHash_, [context] (const dht::Sp<dht::Value>& value){
         auto output = Json::writeString(context->server_->jsonBuilder, value->toJson()) + "\n";
         context->response_.append_chunk(output);
         context->response_.flush();
         return true;
     },
     [context] (bool /*ok*/){
         context->response_.done();
     });

    printf("finishing get function \n");
    return restinio::request_handling_status_t::accepted;
}

Think it should work.

binarytrails commented 5 years ago

Yes, I really understood the power of the shared pointers in such a complex situation. I ended up building the response with make_shared and it works!!

request_status DhtProxyServer::get(restinio::request_handle_t request,
                                   restinio::router::route_params_t params)
{
    printf("Connection Id: %lu\n", request->connection_id());

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    this->scheduleRespDispatch([this, infoHash, request]{
        auto response = std::make_shared<restinio::response_builder_t<response_t>>(
            this->initHttpResponse(request->create_response<response_t>()));
        response->flush();
        // callback per value
        this->dhtNode->get(infoHash, [this, response] (const dht::Sp<dht::Value>& value){
            auto output = Json::writeString(this->jsonBuilder, value->toJson()) + "\n";
            response->append_chunk(output);
            response->flush();
            return true;
        },
        [response] (bool /*ok*/){
            response->done();
        });
    });
    printf("finishing get function done=%i\n", done_action);
    return restinio::request_handling_status_t::accepted;
[2019-05-15 14:56:55.202] TRACE: [connection:1] start connection with 127.0.0.1:37222
[2019-05-15 14:56:55.202] TRACE: [connection:1] start waiting for request
[2019-05-15 14:56:55.202] TRACE: [connection:1] continue reading request
[2019-05-15 14:56:55.202] TRACE: [connection:1] received 139 bytes
[2019-05-15 14:56:55.202] TRACE: [connection:1] request received (#0): GET /wowo
Connection Id: 1
adding request to execute
finishing get function done=0
dispatchResponses woken up, has 1 resps
executing the request
[2019-05-15 14:56:55.202] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 1
[2019-05-15 14:56:55.202] TRACE: [connection:1] start next write group for response (#0), size: 1
[2019-05-15 14:56:55.202] TRACE: [connection:1] start response (#0): HTTP/1.1 200 OK
[2019-05-15 14:56:55.202] TRACE: [connection:1] sending resp data, buf count: 1, total size: 153
[2019-05-15 14:56:55.202] TRACE: [connection:1] outgoing data was sent: 153 bytes
[2019-05-15 14:56:55.202] TRACE: [connection:1] finishing current write group
[2019-05-15 14:56:55.202] TRACE: [connection:1] should keep alive
[2019-05-15 14:56:55.279] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 3
[2019-05-15 14:56:55.279] TRACE: [connection:1] start next write group for response (#0), size: 3
[2019-05-15 14:56:55.279] TRACE: [connection:1] sending resp data, buf count: 3, total size: 62
[2019-05-15 14:56:55.279] TRACE: [connection:1] outgoing data was sent: 62 bytes
[2019-05-15 14:56:55.279] TRACE: [connection:1] finishing current write group
[2019-05-15 14:56:55.279] TRACE: [connection:1] should keep alive
[2019-05-15 14:56:55.366] TRACE: [connection:1] append response (#0), flags: { not_final_parts, connection_keepalive }, write group size: 3
[2019-05-15 14:56:55.366] TRACE: [connection:1] start next write group for response (#0), size: 3
[2019-05-15 14:56:55.366] TRACE: [connection:1] sending resp data, buf count: 3, total size: 62
[2019-05-15 14:56:55.367] TRACE: [connection:1] outgoing data was sent: 62 bytes
[2019-05-15 14:56:55.367] TRACE: [connection:1] finishing current write group
[2019-05-15 14:56:55.367] TRACE: [connection:1] should keep alive
[2019-05-15 14:56:57.279] TRACE: [connection:1] append response (#0), flags: { final_parts, connection_keepalive }, write group size: 1
[2019-05-15 14:56:57.279] TRACE: [connection:1] start next write group for response (#0), size: 1
[2019-05-15 14:56:57.279] TRACE: [connection:1] sending resp data, buf count: 1, total size: 5
[2019-05-15 14:56:57.280] TRACE: [connection:1] outgoing data was sent: 5 bytes
[2019-05-15 14:56:57.280] TRACE: [connection:1] finishing current write group
[2019-05-15 14:56:57.280] TRACE: [connection:1] should keep alive
[2019-05-15 14:56:57.280] TRACE: [connection:1] start waiting for request
[2019-05-15 14:56:57.280] TRACE: [connection:1] continue reading request
[2019-05-15 14:56:57.287] TRACE: [connection:1] EOF and no request, close connection
[2019-05-15 14:56:57.287] TRACE: [connection:1] close
[2019-05-15 14:56:57.288] TRACE: [connection:1] close: close socket
[2019-05-15 14:56:57.288] TRACE: [connection:1] close: timer canceled
[2019-05-15 14:56:57.288] TRACE: [connection:1] close: reset responses data
[2019-05-15 14:56:57.288] TRACE: [connection:1] destructor called
[2019-05-15 14:57:02.478] TRACE: closing server on 0.0.0.0:8080
n0t ~ $ http 127.0.0.1:8080/wowo
HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
Connection: keep-alive
Content-Type: application/json
Server: RESTinio
Transfer-Encoding: chunked

{"data":"d293bw==","id":"4519972680523186013","type":0}
{"data":"d293bw==","id":"9900257124007050156","type":0}

Thank you so much for the help @eao197, until next challenge! :octopus: