drogonframework / drogon

Drogon: A C++14/17/20 based HTTP web application framework running on Linux/macOS/Unix/Windows
MIT License
11.57k stars 1.12k forks source link

Need help on newStreamResponse + Video transcode #1315

Closed panegyrize closed 6 months ago

panegyrize commented 2 years ago

Hi,

I am using Drogon to build a RESTful service which receives requests from clients to transcode the video files. The server needs to stream the new video frames back to the client. Note that I will not save the new video as a file and stream the file back, but I want to stream the video buffer on the fly.

At the time of writing this post, I have successfully registered the methods and implemented the core logic of the transcoding part. I also plan to use the newStreamResponse to stream the video buffer back to the client. I have been blocked by several issues on this implementation and would appreciate any comments and suggestions on this post.

  1. The video transcoding usually takes a long time, which blocks the IO thread if put inside the controller body. So I create a separate event loop thread to run this compute intensive task. Is the two-thread-mode a suggested way for my case? If not, what is a better way to handle this scenario? If so, the question now becomes: how can I pass the frames between the two threads so the newStreamResponse can pick up the data and use it inside the callback function? Currently, I use a buffer which is a typical producer-consumer buffer, i.e., guarded by a mutex, to transfer the frames. But the I/O thread is blocked by waiting the buffer to be filled in. Here is a code snippet that describes my service.
    
    DrogonController::DrogonController() {    
    // trantor::EventLoopThread workerThread_;
    workerThread_.run();
    }

void DrogonController::video(const drogon::HttpRequestPtr& request, std::function<void(const drogon::HttpResponsePtr&)>&& callback, const std::string& videoSrcFile) { // Parse the request parameters to find path, other parameters, etc. parse(); // Run the transcode in another thread. workerThread_.getLoop()->queueInLoop([&sync_buffer]{ // This function gets queued for execution
video.transcode(); pushFrameToBuffer(sync_buffer); });

// newStreamResponse bindings.
std::weak_ptr<DrogonController> thisWeakPtr = shared_from_this();     
auto lambda = [thisWeakPtr](char* pBuffer, std::size_t nBuffSize) -> std::size_t {
    auto thisPtr = thisWeakPtr.lock();
    if (!thisPtr) {
        LOG_ERROR << "Error.";
        return 0;
    }            
    if (pBuffer == nullptr) {
        LOG_INFO << "File ends.";
        return 0;
    }

    // Pop the frames from the buffer. 
    // However, the IO is blocked since it has to wait for the buffer to be filled by the other thread.
    popBuffer(sync_buffer);

    return bufferSize;
};

auto callback = std::bind(lambda, std::placeholders::_1, std::placeholders::_2);
drogon::HttpResponsePtr response = drogon::HttpResponse::newStreamResponse(callback, videoSrcFile);

// Add content type to test with VLC
response->setContentTypeString("video/mp4");

// Callback
callback(response);

}



2. The second question is about the timeout. Assume the transcoding of the current file takes considerably long time to finish, say 10 minutes, would there be any timeout or termination that is imposed by the Drogon on the `newStreamResponse`? 

Thank you all!
hwc0919 commented 2 years ago

Is the two-thread-mode a suggested way for my case?

Yes. And future more, you can create a thread pool to do the transcoding work.

If so, the question now becomes: how can I pass the frames between the two threads so the newStreamResponse can pick up the data and use it inside the callback function.

You can pass everything to your working thread when you receive the request, and do everything there. In this case you dont need to worry about synchonizing problems.

# pseudo code
void Controller::method(const HttpRequestPtr & req, Callback && callback, std::string && video)
{
    workingThreadPool->queue([req, callback = std::move(callback), video = std::move(video)](){
        ...
    });
}
panegyrize commented 2 years ago

Thanks for the response, @hwc0919.

Is this workingThreadPool shared among all the threads that I launch in the main loop function e.g., drogon::app().setThreadNum(12);? Or is it tied to the single thread which runs the heavy workload?

hwc0919 commented 2 years ago

Is this workingThreadPool shared among all the threads that I launch in the main loop function e.g., drogon::app().setThreadNum(12);? Or is it tied to the single thread which runs the heavy workload?

The latter, but not exactly (it's your own choice to use single or multiple threads). drogon::app().setThreadNum(12) sets the number of IO loops, which you don't want to use for cpu intensive workload, because it will block subsequent requests. CPU intensive workload should be processed in standalone thread(s). You can use trantor::EventLoopThreadPool, trantor::ConcurrentTaskQueue, trantor::SerialTaskQueue or your own implementations as your working threadpool. The number of threads should depends on your workload.