pistacheio / pistache

A high-performance REST toolkit written in C++
https://pistacheio.github.io/pistache/
Apache License 2.0
3.12k stars 690 forks source link

single async response multithreads #1083

Closed pintauroo closed 1 year ago

pintauroo commented 1 year ago

I am trying to write a c++ pistache server that, on a specific endpoint, has to contact another pistache server. This is the scenario:

client -> server1 -> server2

client <- server1 <- server2

I am having problems waiting for the response in server1 and sending it back to the client asynchronously.

I share my server1 code to show you how I did implement it.

void doSmth(const Rest::Request& request, Http::ResponseWriter httpResponse){
        auto resp_srv2 = client.post(addr).body(json).send();    
        resp_srv2.then(
        [&](Http::Response response) {
            do_smth;
        },
        [&](std::exception_ptr exc) {
            PrintException excPrinter;
            excPrinter(exc);
    });

    Async::Barrier<Http::Response> barrier(resp_srv2);
    barrier.wait_for(std::chrono::seconds(1));        
    httpResponse.send(response_code);
}

In particular this code works only if I set one single thread for the server1. If I use more thread it crash on the first request. Of course the performances are not that good.

If I try to follow your example, I am able to use more threads. But this has a problem (overflow?) with requests count. On my sever, I can reach around 28k requests sent from the client to server1, and after that it crashes.

Also, if I do like:

               resp_srv2.then(
            [&](Http::Response response) {
                httpResponse.send(r.response_code);             
            },
            [&](std::exception_ptr exc) {
                PrintException excPrinter;
                excPrinter(exc);
        });

it does not work well giving back a segmentation fault when the method is called.

Am I doing something wrong? Please, help me to fix this issue.

Fabio3rs commented 1 year ago

In this case the lambda has a longer lifetime than the by-reference captured variable list, in this case, the response local variable. I think the first step is to move to the heap the response or move the response to a mutable lambda and putting the lambda on the heap.

Maybe something like this moving the response to the heap

    using namespace Pistache;
    auto resp_srv2 = client.get(address).send();

    auto responseheap =
        std::make_shared<Http::ResponseWriter>(std::move(response));

    resp_srv2.then(
        [responseheap](Http::Response srvresponse) {
            // set mime type...
            responseheap->send(srvresponse.code(), srvresponse.body());
        },
        [responseheap](std::exception_ptr exc) {
            PrintException excPrinter;
            excPrinter(exc);

            // set mime type...
            responseheap->send(Pistache::Http::Code::Internal_Server_Error, "{}");
        });

Or using mutable lambda in the heap

    using namespace Pistache;
    auto resp_srv2 = client.get(address).send();

    auto fn = [cliresp =
                   std::move(response)](Http::Response srvresponse) mutable {
        cliresp.send(srvresponse.code(), srvresponse.body());
    };

    auto then = std::make_shared<decltype(fn)>(std::move(fn));

    resp_srv2.then(
        [then](Http::Response srvresponse) { (*then)(std::move(srvresponse)); },
        [&](std::exception_ptr exc) {
            PrintException excPrinter;
            excPrinter(exc);
        });
pintauroo commented 1 year ago

Thanks for the answer!

I tried both, but in the first case, I got a segmentation fault.

In the second one, I got: double free or corruption (fasttop) and core dumped.

I guess that in this way, it is hard to handle the heap memory efficiently using multiple threads.

Fabio3rs commented 1 year ago

Thanks for the answer!

I tried both, but in the first case, I got a segmentation fault.

In the second one, I got: double free or corruption (fasttop) and core dumped.

I guess that in this way, it is hard to handle the heap memory efficiently using multiple threads.

I’m happy to help! :)

Which compiler are you using? Are using memory sanitizers? Here the code seens to work.

pintauroo commented 1 year ago

Ok sorry I went straight to the worst case.

The code actually works with a little load. As soon as I try to increase the load using jmeter I have the problems I reported before. But since I need to test with many users I can't have these problems.

On my laptop I have gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.1). I didn't test it elsewhere yet but as a first step the code should work at least with 200 users (the working implementation I shown in the first message can handle around 100 users).

Fabio3rs commented 1 year ago

No problem. Thanks!

I think there is some static instance being reused or something like this, I will do some tests with high load and with GDB attached.

Fabio3rs commented 1 year ago

Sorry, I could not reproduce the problem :(

I tried Apache AB and JMeter to test with high concurrency numbers. Example ab -n 20000 -c 500 -k -m GET http://127.0.0.1:9000/verify_another_api I compiled with address sanitizer too but had no messages.

I am testing only with the example code. The Pistache I am using I compiled from the master branch. Maybe could be a bug in the Http Client's code?

I create a repository with my last try, I am using some wrappers but I don't think it affects the result in this case: https://github.com/Fabio3rs/TestPistacheAsyncHttpClient/blob/main/src/web/webmgr.cpp

I run the first server and the second with PORT=1234 build/bin/testPistacheClient

pintauroo commented 1 year ago

Ok so you are not having problems at all?

I am guessing 2 differences. I am sending a post request and the elaboration times of my server2 maybe are too long?

Fabio3rs commented 1 year ago

Until this moment no problem.

I tested using a 10ms and a 1 second wait, can be the response size too, I am testing with a small message.

pintauroo commented 1 year ago

yes my response message is small but the srv2 elaboration needs a while needs a while. I am using their examples basically. I didn't change that much. The rest_server.cc specifically to start the server. Might it be wrong on smth?

Fabio3rs commented 1 year ago

I think there are some problem in the code I posted as example, now memory leaks happened here,

SUMMARY: AddressSanitizer: 82048 byte(s) leaked in 1240 allocation(s).

I will revise my code

Fabio3rs commented 1 year ago

Sorry for the double post.

I think I found the error in my code:

The response shared_ptr is captured by-value and it was not being released, the solution I found is to mark the lambda as mutable and move the response from the lambda memory to a local variable scope, so the shared_ptr of the response in the heap will be clean when goes out-of-scope.


        resp_srv2.then(
            [responseheap](Response srvresponse) mutable {
                auto response = std::move(responseheap);
                // set mime type...
                response->send(srvresponse.code(), srvresponse.body());
            },
            [responseheap](std::exception_ptr exc) mutable {
                auto response = std::move(responseheap);
                PrintException excPrinter;
                excPrinter(std::move(exc));

                // set mime type...
                response->send(Code::Internal_Server_Error, "{}");
            });

The segmentation fault not occured here yet, but I think is that: if the your http test client disconnects in the middle of the connection, PIstache is removing the last reference to the Socket Peer FD's shared_ptr and when the code in the callback will write in the socket it can get a invalid weak_ptr https://github.com/pistacheio/pistache/blob/master/src/common/http.cc#L876

Maybe if (peer.expired()) can be false in a moment, but another thread removes the last reference to the shared ptr in between this instructions and peer.lock() retuns a nullptr shared_ptr

I think it can be solved getting manually a instance of the Peer Fd shared_ptr and capturing to the lambda.

        auto peerlocked = responseheap->getPeer();

        resp_srv2.then(
            [responseheap, peerlocked](Response srvresponse) mutable {
                auto peerfd = std::move(peerlocked);
                auto response = std::move(responseheap);
                // set mime type...
                response->send(srvresponse.code(), srvresponse.body());
            },
            [responseheap, peerlocked](std::exception_ptr exc) mutable {
                auto peerfd = std::move(peerlocked);
                auto response = std::move(responseheap);
                PrintException excPrinter;
                excPrinter(std::move(exc));

                // set mime type...
                response->send(Code::Internal_Server_Error, "{}");
            });

Or getting the Peer inside the lambda and testing if it is nullptr, or maybe the Pistache code throws an exception in this situation.

        resp_srv2.then(
            [responseheap](Response srvresponse) mutable {
                auto response = std::move(responseheap);
                auto peerlocked = response->getPeer();

                if (!peerlocked) {
                    // Client disconnected
                    return;
                }
                // set mime type...
                response->send(srvresponse.code(), srvresponse.body());
            },
            [responseheap](std::exception_ptr exc) mutable {
                auto response = std::move(responseheap);
                auto peerlocked = response->getPeer();

                if (!peerlocked) {
                    // Client disconnected
                    return;
                }

                PrintException excPrinter;
                excPrinter(std::move(exc));

                // set mime type...
                response->send(Code::Internal_Server_Error, "{}");
            });

The updated code https://github.com/Fabio3rs/TestPistacheAsyncHttpClient/blob/main/src/web/webmgr.cpp#L32

pintauroo commented 1 year ago

Ok I was trying to reproduce it but still struggling with it. I noticed that I am not using the Experimental Pistache Implementation and I cant use the

auto peerlocked = responseheap->getPeer();

My code is not able the find the function name even exporting the library included giwing the following messages back:

error: ‘using element_type = class Pistache::Http::ResponseWriter’ {aka ‘class Pistache::Http::ResponseWriter’} has no member named ‘getPeer’

In lambda function:

error: ‘peerlocked’ is not captured

I also tried to compile and install from source again but still the same issue.

I would like to ask you one last favour. Please could you try to adapt your code to the rest_server.cc example considering my issues? It would make my life a lot easier!

Still thanks for your help

Fabio3rs commented 1 year ago

I also tried to compile and install from source again but still the same issue.

I think if you have the pistache installed by the apt, maybe the headers of the repository version has a high priority, so the headers of the compiled from the source will not be used.

I would like to ask you one last favour. Please could you try to adapt your code to the [rest_server.cc]

I will look here what functions the repository version provides and try to do adapt the code.

Is the apt version of this repository? ppa:pistache+team/unstable I had a github workflow running for my code and it compiled with the apt version https://github.com/Fabio3rs/TestPistacheAsyncHttpClient/blob/main/.github/workflows/BuildTest.yml sudo add-apt-repository ppa:pistache+team/unstable && sudo apt update && sudo apt install libpistache-dev

Still thanks for your help

You are welcome, I'm glad to help :)

Fabio3rs commented 1 year ago

There is another method called only "peer" in the ResponseWriter, as I don't know the exactly version you are using, I used this version instead.

The method with the code: doAsyncRequest

I hope it works. I waiting for the feedback.

I was thinking here and maybe the memory leak I saw was a false positive maybe I closed while some connections was open/in the queue, I need to look more carefuly.

/*
 * SPDX-FileCopyrightText: 2016 Mathieu Stefani
 *
 * SPDX-License-Identifier: Apache-2.0
 */

/*
   Mathieu Stefani, 07 février 2016

   Example of a REST endpoint with routing
*/

#include <algorithm>

#include <exception>
#include <pistache/client.h>
#include <pistache/endpoint.h>
#include <pistache/http.h>
#include <pistache/router.h>

using namespace Pistache;

void printCookies(const Http::Request &req) {
  auto cookies = req.cookies();
  std::cout << "Cookies: [" << std::endl;
  const std::string indent(4, ' ');
  for (const auto &c : cookies) {
    std::cout << indent << c.name << " = " << c.value << std::endl;
  }
  std::cout << "]" << std::endl;
}

namespace Generic {

void handleReady(const Rest::Request &, Http::ResponseWriter response) {
  response.send(Http::Code::Ok, "1");
}

} // namespace Generic

using HttpClient = Http::Experimental::Client;

static HttpClient client;

class StatsEndpoint {
public:
  explicit StatsEndpoint(Address addr)
      : httpEndpoint(std::make_shared<Http::Endpoint>(addr)) {}

  void init(size_t thr = 2) {
    auto opts = Http::Endpoint::options().threads(static_cast<int>(thr));
    httpEndpoint->init(opts);
    setupRoutes();
  }

  void start() {
    httpEndpoint->setHandler(router.handler());
    httpEndpoint->serve();
  }

private:
  void setupRoutes() {
    using namespace Rest;

    Routes::Post(router, "/record/:name/:value?",
                 Routes::bind(&StatsEndpoint::doRecordMetric, this));
    Routes::Get(router, "/value/:name",
                Routes::bind(&StatsEndpoint::doGetMetric, this));
    Routes::Get(router, "/ready", Routes::bind(&Generic::handleReady));
    Routes::Get(router, "/auth", Routes::bind(&StatsEndpoint::doAuth, this));
    Routes::Get(router, "/async_request",
                Routes::bind(&StatsEndpoint::doAsyncRequest, this));
    Routes::Get(router, "/another_address",
                Routes::bind(&StatsEndpoint::doDelayedResponse, this));
  }

  void doRecordMetric(const Rest::Request &request,
                      Http::ResponseWriter response) {
    auto name = request.param(":name").as<std::string>();

    Guard guard(metricsLock);
    auto it =
        std::find_if(metrics.begin(), metrics.end(), [&](const Metric &metric) {
          return metric.name() == name;
        });

    int val = 1;
    if (request.hasParam(":value")) {
      auto value = request.param(":value");
      val = value.as<int>();
    }

    if (it == std::end(metrics)) {
      metrics.emplace_back(std::move(name), val);
      response.send(Http::Code::Created, std::to_string(val));
    } else {
      auto &metric = *it;
      metric.incr(val);
      response.send(Http::Code::Ok, std::to_string(metric.value()));
    }
  }

  void doGetMetric(const Rest::Request &request,
                   Http::ResponseWriter response) {
    auto name = request.param(":name").as<std::string>();

    Guard guard(metricsLock);
    auto it =
        std::find_if(metrics.begin(), metrics.end(), [&](const Metric &metric) {
          return metric.name() == name;
        });

    if (it == std::end(metrics)) {
      response.send(Http::Code::Not_Found, "Metric does not exist");
    } else {
      const auto &metric = *it;
      response.send(Http::Code::Ok, std::to_string(metric.value()));
    }
  }

  void doAuth(const Rest::Request &request, Http::ResponseWriter response) {
    printCookies(request);
    response.cookies().add(Http::Cookie("lang", "en-US"));
    response.send(Http::Code::Ok);
  }

  static auto getPeer(const Http::ResponseWriter &response)
      -> std::shared_ptr<Tcp::Peer> {
    try {
      return response.peer();
    } catch (...) {
    }
    return nullptr;
  }

  void doAsyncRequest(const Rest::Request &request,
                      Http::ResponseWriter response) {

    using namespace Pistache::Http;

    std::string address = "http://127.0.0.1:1234/another_address";
    auto resp_srv2 = client.get(address).send();

    auto responseHeap = std::make_shared<ResponseWriter>(std::move(response));

    resp_srv2.then(
        [responseHeap](Response srvresponse) mutable {
          auto response = std::move(responseHeap);
          auto peerLocked = getPeer(*response);
          if (!peerLocked) {
            // Client disconnected
            return;
          }

          response->send(srvresponse.code(), srvresponse.body());
        },
        [responseHeap](std::exception_ptr exc) mutable {
          auto response = std::move(responseHeap);
          auto peerLocked = getPeer(*response);

          if (!peerLocked) {
            // Client disconnected
            return;
          }

          PrintException excPrinter;
          excPrinter(std::move(exc));

          // set mime type...
          response->send(Code::Internal_Server_Error, "{}");
        });
  }

  void doDelayedResponse(const Rest::Request &request,
                         Http::ResponseWriter response) {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    std::string strresp;

    strresp.resize(256, 'A');

    response.send(Http::Code::Ok, strresp);
  }

  class Metric {
  public:
    explicit Metric(std::string name, int initialValue = 1)
        : name_(std::move(name)), value_(initialValue) {}

    int incr(int n = 1) {
      int old = value_;
      value_ += n;
      return old;
    }

    int value() const { return value_; }

    const std::string &name() const { return name_; }

  private:
    std::string name_;
    int value_;
  };

  using Lock = std::mutex;
  using Guard = std::lock_guard<Lock>;
  Lock metricsLock;
  std::vector<Metric> metrics;

  std::shared_ptr<Http::Endpoint> httpEndpoint;
  Rest::Router router;
};

int main(int argc, char *argv[]) {
  Port port(9080);

  client.init(HttpClient::options().threads(8));

  int thr = 2;

  if (argc >= 2) {
    port = static_cast<uint16_t>(std::stol(argv[1]));

    if (argc == 3)
      thr = std::stoi(argv[2]);
  }

  Address addr(Ipv4::any(), port);

  std::cout << "Cores = " << hardware_concurrency() << std::endl;
  std::cout << "Using " << thr << " threads" << std::endl;

  StatsEndpoint stats(addr);

  stats.init(thr);
  stats.start();
}
pintauroo commented 1 year ago

Oh man, so sorry to report you that I am still getting the core dumped message. I tried it also on my server. It works for a while with a maximum of 200 threads and then it generates the core dump. Reducing the thread number to 50 it have a longer lifetime but still less than one minute. Still can't figure out the real problem. :(

Fabio3rs commented 1 year ago

:(

Can you run with GDB attached? Do you are using 200 threads in each the http server and http client? I didn't tested with that much threads in each, can be a system ulimit problem.

Running directly in the terminal here with 200 threads it won't work without increasing ulimit ulimit -n 2048 ulimit has another parameters, can be necessary to increase them too.

pintauroo commented 1 year ago

This is my current ulimit. It is a powerful server:

ulimit -u 256996 ulimit -n 1048576 ulimit -f unlimited ulimit -c 0

yes each http request is sent each 100-500 ms and the number of users is between 100 and 200.

I build it in a docker container.

I never played with gdb so it would take some time to do it

Fabio3rs commented 1 year ago

Built and running in a docker container? Can you post your Dockerfile? I can try run in a Ubuntu server in container to be equal as possible to your setup.

Basic start with gdb:

gdb executable_path
# gdb will open a debug command line
# you can put args in the start command example start 9000 200
start 

# gdb normally breaks on the main entry point, use the command continue
continue

# when the core dump occours get the backtrace with bt
bt

# quit to exit gdb
quit
pintauroo commented 1 year ago
free(): double free detected in tcache 2

Thread 20 "server_run" received signal SIGABRT, Aborted.
--Type <RET> for more, q to quit, c to continue without paging--bt

[Switching to Thread 0x7fffee908700 (LWP 52511)]
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
50      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.

This is on my laptop following your instructions. (Sorry I cant share the private docker)

pintauroo commented 1 year ago

Ok wait maybe I got it.

Fabio3rs commented 1 year ago

If the program is compiled in release mode in most of the times the stacktrace can not be detected.

If you are compiling manually, the flag -g enables debug info and -fno-omit-frame-pointer tells the compiler to not optmize the stack frame pointer.

No problem.

Ok wait maybe I got it.

Ok, waiting

pintauroo commented 1 year ago

client.init(Http::Client::options().threads(8)); cahnges someting but still the same after a while... I dont know what the thread numbers is used for. Like this the test works for a while and then crashes.

Fabio3rs commented 1 year ago

How much time the second server takes to respond?

Can you compile with the lastest pistache version? Using add_subdirectory in CMakeLists.txt and git submodule and if necessary include_directories(BEFORE SYSTEM pistachesubmodule/includes)

I did an example https://github.com/Fabio3rs/PistacheAsyncServerExampleWithSubmodule

pintauroo commented 1 year ago

Dear @Fabio3rs Sorry but I haven't been able to run other tests before. In a new environment with ubuntu 22 and installing pistache from apt I am having this issue:

malloc(): unaligned tcache chunk detected 15103 Aborted (core dumped)

or with the old cmake:

munmap_chunk(): invalid pointer 9898 Aborted (core dumped)

this is the new cmake:

set(projectname server)
set(SOURCE_FILES server.cc)

cmake_minimum_required(VERSION 3.6.0)
project(${projectname} VERSION 0.1.1)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_DEFAULT 17)
set(CXX17 ON)
set(CMAKE_CXX_EXTENSIONS Off)
set(CMAKE_C_EXTENSIONS Off)

include(CPack)
find_package(PkgConfig)

message("Finding pistache")

find_package(PkgConfig)
pkg_check_modules(Pistache REQUIRED IMPORTED_TARGET libpistache)# Create our executable file from our source files

add_executable(${projectname} ${SOURCE_FILES})
target_link_libraries(${projectname} pthread PkgConfig::Pistache)

Can it be an helpful information?

I tried gdb and I am using the server you posted getting back:

double free or corruption (fasttop)

Thread 17 "server" received signal SIGABRT, Aborted.
[Switching to Thread 0x7fffefc81640 (LWP 4973)]
__pthread_kill_implementation (no_tid=0, signo=6, threadid=140737216255552) at ./nptl/pthread_kill.c:44
44      ./nptl/pthread_kill.c: No such file or directory.

Also, it is running outside docker.

Fabio3rs commented 1 year ago

Dear @pintauroo

Dear @Fabio3rs Sorry but I haven't been able to run other tests before.

No problem, do the tests when you feel most comfortable

malloc(): unaligned tcache chunk detected 15103 Aborted (core dumped) or with the old cmake: munmap_chunk(): invalid pointer 9898 Aborted (core dumped)

It seens you are having problem with various setups, I tried another combinations of compilers [clang 12, gcc 9, gcc 11] and operating system [Mint 20.3, Ubuntu 20.04, Ubuntu 22.04] but I can't reproduce the problem with my test code. Do you are doing some processing of the second server response with extra code in the first server? Can be some concurrency problem there. I am testing only with the example code.

How much time takes to occurs the segmentation fault? Maybe the next try is to use address sanitizers in the compile and linker options, since the GDB is not catching the problem trace.

The tests I am doing with this code and the new test cmake https://github.com/Fabio3rs/PistacheAsyncServerExampleWithSubmodule/blob/main/rest_server.cc https://www.youtube.com/watch?v=F54tBEH85Ms

pintauroo commented 1 year ago

Ok thanks a lot ! This time it worked. I did re-implement everything and probably I had some buggish code... so we will never know what was my issue :(

I suggest to @pistacheio to use your example as a library example. It was very helpful to me

Thanks again for your support!

/*
 * SPDX-FileCopyrightText: 2016 Mathieu Stefani
 *
 * SPDX-License-Identifier: Apache-2.0
 */

/*
   Mathieu Stefani, 07 février 2016

   Example of a REST endpoint with routing
*/

#include <algorithm>

#include <exception>
#include <pistache/client.h>
#include <pistache/endpoint.h>
#include <pistache/http.h>
#include <pistache/router.h>

using namespace Pistache;

void printCookies(const Http::Request &req) {
  auto cookies = req.cookies();
  std::cout << "Cookies: [" << std::endl;
  const std::string indent(4, ' ');
  for (const auto &c : cookies) {
    std::cout << indent << c.name << " = " << c.value << std::endl;
  }
  std::cout << "]" << std::endl;
}

namespace Generic {

void handleReady(const Rest::Request &, Http::ResponseWriter response) {
  response.send(Http::Code::Ok, "1");
}

} // namespace Generic

using HttpClient = Http::Experimental::Client;

static HttpClient client;

class StatsEndpoint {
public:
  explicit StatsEndpoint(Address addr)
      : httpEndpoint(std::make_shared<Http::Endpoint>(addr)) {}

  void init(size_t thr = 2) {
    auto opts = Http::Endpoint::options().threads(static_cast<int>(thr));
    httpEndpoint->init(opts);
    setupRoutes();
  }

  void start() {
    httpEndpoint->setHandler(router.handler());
    httpEndpoint->serve();
  }

private:
  void setupRoutes() {
    using namespace Rest;

    Routes::Post(router, "/record/:name/:value?",
                 Routes::bind(&StatsEndpoint::doRecordMetric, this));
    Routes::Get(router, "/value/:name",
                Routes::bind(&StatsEndpoint::doGetMetric, this));
    Routes::Get(router, "/ready", Routes::bind(&Generic::handleReady));
    Routes::Get(router, "/auth", Routes::bind(&StatsEndpoint::doAuth, this));
    Routes::Get(router, "/async_request",
                Routes::bind(&StatsEndpoint::doAsyncRequest, this));
    Routes::Get(router, "/another_address",
                Routes::bind(&StatsEndpoint::doDelayedResponse, this));
  }

  void doRecordMetric(const Rest::Request &request,
                      Http::ResponseWriter response) {
    auto name = request.param(":name").as<std::string>();

    Guard guard(metricsLock);
    auto it =
        std::find_if(metrics.begin(), metrics.end(), [&](const Metric &metric) {
          return metric.name() == name;
        });

    int val = 1;
    if (request.hasParam(":value")) {
      auto value = request.param(":value");
      val = value.as<int>();
    }

    if (it == std::end(metrics)) {
      metrics.emplace_back(std::move(name), val);
      response.send(Http::Code::Created, std::to_string(val));
    } else {
      auto &metric = *it;
      metric.incr(val);
      response.send(Http::Code::Ok, std::to_string(metric.value()));
    }
  }

  void doGetMetric(const Rest::Request &request,
                   Http::ResponseWriter response) {
    auto name = request.param(":name").as<std::string>();

    Guard guard(metricsLock);
    auto it =
        std::find_if(metrics.begin(), metrics.end(), [&](const Metric &metric) {
          return metric.name() == name;
        });

    if (it == std::end(metrics)) {
      response.send(Http::Code::Not_Found, "Metric does not exist");
    } else {
      const auto &metric = *it;
      response.send(Http::Code::Ok, std::to_string(metric.value()));
    }
  }

  void doAuth(const Rest::Request &request, Http::ResponseWriter response) {
    printCookies(request);
    response.cookies().add(Http::Cookie("lang", "en-US"));
    response.send(Http::Code::Ok);
  }

  static auto getPeer(const Http::ResponseWriter &response)
      -> std::shared_ptr<Tcp::Peer> {
    try {
      return response.peer();
    } catch (...) {
    }
    return nullptr;
  }

  void doAsyncRequest(const Rest::Request &request,
                      Http::ResponseWriter response) {

    using namespace Pistache::Http;

    std::string address = "http://127.0.0.1:1234/another_address";
    auto resp_srv2 = client.get(address).send();

    auto responseHeap = std::make_shared<ResponseWriter>(std::move(response));

    resp_srv2.then(
        [responseHeap](Response srvresponse) mutable {
          auto response = std::move(responseHeap);
          auto peerLocked = getPeer(*response);
          if (!peerLocked) {
            // Client disconnected
            return;
          }

          response->send(srvresponse.code(), srvresponse.body());
        },
        [responseHeap](std::exception_ptr exc) mutable {
          auto response = std::move(responseHeap);
          auto peerLocked = getPeer(*response);

          if (!peerLocked) {
            // Client disconnected
            return;
          }

          PrintException excPrinter;
          excPrinter(std::move(exc));

          // set mime type...
          response->send(Code::Internal_Server_Error, "{}");
        });
  }

  void doDelayedResponse(const Rest::Request &request,
                         Http::ResponseWriter response) {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    std::string strresp;

    strresp.resize(256, 'A');

    response.send(Http::Code::Ok, strresp);
  }

  class Metric {
  public:
    explicit Metric(std::string name, int initialValue = 1)
        : name_(std::move(name)), value_(initialValue) {}

    int incr(int n = 1) {
      int old = value_;
      value_ += n;
      return old;
    }

    int value() const { return value_; }

    const std::string &name() const { return name_; }

  private:
    std::string name_;
    int value_;
  };

  using Lock = std::mutex;
  using Guard = std::lock_guard<Lock>;
  Lock metricsLock;
  std::vector<Metric> metrics;

  std::shared_ptr<Http::Endpoint> httpEndpoint;
  Rest::Router router;
};

int main(int argc, char *argv[]) {
  Port port(9080);

  client.init(HttpClient::options().threads(8));

  int thr = 2;

  if (argc >= 2) {
    port = static_cast<uint16_t>(std::stol(argv[1]));

    if (argc == 3)
      thr = std::stoi(argv[2]);
  }

  Address addr(Ipv4::any(), port);

  std::cout << "Cores = " << hardware_concurrency() << std::endl;
  std::cout << "Using " << thr << " threads" << std::endl;

  StatsEndpoint stats(addr);

  stats.init(thr);
  stats.start();
}