drogonframework / drogon

Drogon: A C++14/17/20 based HTTP web application framework running on Linux/macOS/Unix/Windows
MIT License
11.65k stars 1.12k forks source link

AsyncStream Memory Leak #2192

Open simoberny opened 1 month ago

simoberny commented 1 month ago

Hi, I’m encountering an issue with the send function of the ResponseStream in my application. The goal is to stream images to a web page, and while it works correctly at first, I’ve noticed that over time the RAM usage gradually increases (by several MB every few seconds).

Interestingly, this problem appears to be linked to network speed or buffer buildup. If I increase the pause between sending images, the memory increase slows down or disappears. Additionally, sending smaller payloads prevents the issue entirely. When testing on a gigabit Ethernet connection, the problem doesn’t occur, but it becomes noticeable when using slower connections, such as a 4G SIM network.

It seems like the data is not being consumed fast enough on slower networks, leading to memory buildup. Could this be related to how the internal buffer is managed within the ResponseStream? Any guidance on how to handle this situation or optimize the streaming

Thanks in advance

To Reproduce Steps to reproduce the behavior:

  1. Implement an AsyncStreamResponse
  2. Loop infinitely and send something "big" like an image
  3. Look at the RAM increasing

Desktop (please complete the following information):

Code:

app().registerHandler(
            "/stream",
            [](const HttpRequestPtr &req,
               std::function<void(const HttpResponsePtr &)> &&callback)
            {
                auto resp = HttpResponse::newAsyncStreamResponse(
                    [](ResponseStreamPtr stream)
                    {
                        std::thread([stream =
                                         std::shared_ptr<ResponseStream>{
                                             std::move(stream)}]() mutable
                                    {
                        long long int last_send = 0;
                        cfg.streaming_ = true;

                        std::vector<int> param(2);
                        param[0] = cv::IMWRITE_JPEG_QUALITY;
                        param[1] = 60; // default(95) 0-100

                        std::vector<uint8_t> buffer;

                        std::string payload = "";

                        while(not sgn_thread)
                        {
                            if (not cfg.vehicle_img_.empty())
                            {
                                if(cfg.captured_img_ != last_send)
                                {
                                    cv::imencode(".jpg", cfg.vehicle_img_, buffer, param);

                                    // Invia il boundary e l'header per la nuova immagine
                                    payload = "\r\n--" + kPartBoundary + "\r\nContent-Type: image/jpeg\r\nContent-Length: " + std::to_string(buffer.size()) + "\r\n\r\n";
                                    payload.append(buffer.begin(), buffer.end());

                                    if(stream->send(payload))
                                    {
                                        last_send = cfg.captured_img_;
                                        std::this_thread::sleep_for(std::chrono::milliseconds(25));
                                    }
                                    else 
                                    {
                                        break;
                                    }
                                }

                                std::this_thread::sleep_for(std::chrono::milliseconds(5));
                            }

                            buffer.clear();
                        }

                        stream->close(); 
                        cfg.streaming_ = false;

                        }).detach();
                    }, true);

                resp->setContentTypeString(kStreamContent);
                resp->addHeader("Access-Control-Allow-Origin", "*");
                callback(resp);
            },
            {Get});