drogonframework / drogon

Drogon: A C++14/17/20 based HTTP web application framework running on Linux/macOS/Unix/Windows
MIT License
11.62k stars 1.12k forks source link

压力测试时候出现大量Network failure #1795

Closed nqf closed 1 year ago

nqf commented 1 year ago

我使用 3000 个httpclient, 发送测试请求, 当服务端处理很慢的时候(服务端工作线程阻塞), 客户端就会出现大量的, Network failure, 但是服务端并没有关闭socket 链接, 什么情况下会出现 Network failure 错误?

an-tao commented 1 year ago

客户端可能设了超时吧

nqf commented 1 year ago

客户端可能设了超时吧

#include <mutex>
#include <queue>
#include <random>

#include <drogon/drogon.h>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include <trantor/net/EventLoopThreadPool.h>
#include <inja/inja.hpp>
#include <yaml_cpp_struct.hpp>

struct Packet {
    std::string http_path;
    std::string http_body;
    std::unordered_map<std::string, int32_t> butens_request_num;
};
YCS_ADD_STRUCT(Packet, http_path, http_body, butens_request_num)

struct Config {
    std::vector<Packet> request_packet;
};
YCS_ADD_STRUCT(Config, request_packet)

DEFINE_int32(threads, 8, "io threads");
DEFINE_int32(http_client_count, 32, "Number of http client");
DEFINE_string(host, "http://127.0.0.1:8848", "host");
DEFINE_string(cfg, "../cfg/ecgw.yaml", "config file");

uint64_t getMilliseconds() {
    auto now = std::chrono::system_clock::now();
    auto ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
    return ms.time_since_epoch().count();
}

auto createRequest(auto& cfg) {
    std::vector<drogon::HttpRequestPtr> tmp;
    for (auto& packet : cfg.request_packet) {
        for (auto& [buten, count] : packet.butens_request_num) {
            std::generate_n(std::back_inserter(tmp), count, [&] {
                auto request_ptr = drogon::HttpRequest::newHttpRequest();
                request_ptr->setPath(packet.http_path);
                request_ptr->setMethod(drogon::HttpMethod::Post);
                inja::Environment env;
                nlohmann::json data;
                data["buten_cd"] = buten;
                auto json_str = env.render(packet.http_body, data);
                nlohmann::json request = nlohmann::json::parse(json_str);
                request_ptr->setBody(request.dump());
                return request_ptr;
            });
        }
    }
    std::random_device rd;
    std::mt19937 gen(rd());
    std::shuffle(tmp.begin(), tmp.end(), gen);
    return std::queue<drogon::HttpRequestPtr>(tmp.begin(), tmp.end());
}

auto createHttpClient() {
    std::vector<drogon::HttpClientPtr> http_clients;
    auto loop_pool_ptr = std::make_shared<trantor::EventLoopThreadPool>(FLAGS_threads);
    loop_pool_ptr->start();
    auto loop_pool = std::make_shared<trantor::EventLoopThreadPool>(4);
    loop_pool->start();
    for (int32_t i = 0; i < FLAGS_http_client_count; ++i) {
        auto client = drogon::HttpClient::newHttpClient(FLAGS_host, loop_pool_ptr->getNextLoop());
        client->setSockOptCallback([&](int fd) {
            static int sndbuf_size = 1024 * 1024 * 1;
            setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf_size, sizeof(sndbuf_size));
            setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sndbuf_size, sizeof(sndbuf_size));
        });
        http_clients.emplace_back(std::move(client));
    }
    return std::make_tuple(http_clients, loop_pool_ptr, loop_pool);
}

void stop(auto& http_clients, auto& loop_pool_ptr) {
    for (auto& cli : http_clients)
        cli.reset();
    for (auto& ptr : loop_pool_ptr->getLoops())
        ptr->runInLoop([=] { ptr->quit(); });
    loop_pool_ptr->wait();
}

int main(int argc, char** argv) {
    trantor::Logger::setLogLevel(trantor::Logger::kDebug);
    gflags ::SetVersionString("0.0.0.1");
    gflags ::SetUsageMessage("Usage : ./ecgw-tester");
    google::ParseCommandLineFlags(&argc, &argv, true);

    auto [config, error] = yaml_cpp_struct::from_yaml<Config>(FLAGS_cfg);
    if (!config) {
        spdlog::error("{}", error);
        return -1;
    }
    auto& cfg = config.value();

    std::mutex mtx;
    auto http_requests = createRequest(cfg);
    int32_t total_requests = http_requests.size();

    spdlog::info("threads: {}", FLAGS_threads);
    spdlog::info("http_client_count: {}", FLAGS_http_client_count);
    spdlog::info("total_requests: {}", total_requests);

    auto [http_clients, loop_pool_ptr, worker_loop_pool] = createHttpClient();

    std::atomic<int64_t> send_request_num{0};
    std::atomic<int64_t> ok_count{0};
    std::atomic<int64_t> err_count{0};
    std::atomic<int64_t> warn_count{0};

    std::mutex latency_mtx;
    std::vector<int32_t> latency;

    std::vector<std::future<void>> futs;
    futs.reserve(http_clients.size());
    auto start = getMilliseconds();

    for (auto& client : http_clients) {
        auto p = std::make_shared<std::promise<void>>();
        futs.emplace_back(p->get_future());
        drogon::async_run([&, p, client]() mutable -> drogon::Task<> {
            while (true) {
                // co_await drogon::queueInLoopCoro(worker_loop_pool->getNextLoop(), [] {});
                auto count = send_request_num.fetch_add(1, std::memory_order_relaxed);
                if (count >= total_requests)
                    break;
                int32_t size;
                drogon::HttpRequestPtr request_ptr;
                {
                    std::lock_guard lk(mtx);
                    if (http_requests.empty())
                        break;
                    request_ptr = std::move(http_requests.front());
                    http_requests.pop();
                    size = http_requests.size();
                }
                auto start = getMilliseconds();
                try {
                    auto resp = co_await client->sendRequestCoro(std::move(request_ptr), 120);
                    if (resp->getStatusCode() != drogon::HttpStatusCode::k200OK) {
                        warn_count++;
                        goto ok;
                    }
                    nlohmann::json j = nlohmann::json::parse(resp->getBody());
                    auto result = j.at("result").get<std::string>();
                    if (result != "Success") {
                        warn_count++;
                        goto ok;
                    }
                    ok_count++;
                } catch (const std::exception& e) {
                    spdlog::error("{}: {}", e.what(), size);
                    err_count++;
                }
            ok:
                auto end = getMilliseconds();
                {
                    std::lock_guard lk(latency_mtx);
                    latency.emplace_back(end - start);
                }
            }
            p->set_value();
            co_return;
        });
    }
    spdlog::info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
    std::for_each(futs.begin(), futs.end(), [](auto& p) { p.wait(); });
    auto end = getMilliseconds();
    spdlog::info("=====================================================================");

    int32_t min_value{0};
    int32_t max_value{0};
    if (!latency.empty()) {
        auto result = std::minmax_element(latency.begin(), latency.end());
        min_value = *result.first;
        max_value = *result.second;
    }
    int32_t sum = std::accumulate(latency.begin(), latency.end(), 0);
    double average = static_cast<double>(sum) / latency.size();

    spdlog::info("min latency[{}]/ms, max latency[{}]/ms,average latency[{}]/ms, Throughput[{}]/sec",
        min_value, max_value, average, total_requests / ((end - start) / 1000.0));
    spdlog::info("ok_count[{}], err_count[{}], warn_count[{}]", ok_count.load(), err_count.load(), warn_count.load());

    stop(http_clients, loop_pool_ptr);

    return 0;
}
nqf commented 1 year ago

客户端可能设了超时吧

下面是我的完整测试代码, 我现在非常迷惑, 如果我在服务端不进行阻塞, 就不会出现网络错误, 我设置超时 120 秒, 实际上, 总共运行时间也没有120 秒, 我不确定是不是 trantor 有bug

an-tao commented 1 year ago

服务端程序的最大文件描述符数量调整了么

nqf commented 1 year ago

服务端程序的最大文件描述符数量调整了么

服务端是boost beast, 如果文件描述符 不够的话应该会报错

☁  bin [dev] ⚡  ulimit -n       
30000
nqf commented 1 year ago

如果我再服务端 sleep 10 毫秒 就会出现很多很多 网络错误, 如果我 co_await timeout(std::chrono::milliseconds(100)), 让出 服务端工作线程 , drogon 客户端就不会出现错误, 但是目前只是 3000 个客户端, 每个客户端发送一个请求, 接到返回以后在发送另外一个, 但是不知道为什么会触发网络错误

        while (true) {
            boost::beast::flat_buffer buffer;
            boost::beast::http::request<boost::beast::http::string_body> request;
            auto result = co_await (boost::beast::http::async_read(stream, buffer, request, use_nothrow_awaitable) ||
                                    timeout(std::chrono::seconds(cfg.interval)));
            if (result.index() == 1) {
                SPDLOG_INFO("read timeout");
                co_return;
            }
            auto [ec, bytes_transferred] = std::get<0>(result);
            if (ec) {
                SPDLOG_INFO("333333333333333333async_read: {}", ec.message());
                co_return;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            // co_await timeout(std::chrono::milliseconds(100));
            SPDLOG_ERROR("{}", request.body());
            SPDLOG_ERROR("{}", request.target());
nqf commented 1 year ago

应该是 connect 出现错误了