CrowCpp / Crow

A Fast and Easy to use microframework for the web.
https://crowcpp.org
Other
3.25k stars 357 forks source link

MJPEG support within crow #890

Open Rz-Rz opened 2 months ago

Rz-Rz commented 2 months ago

Hello, I'm trying to implement MJPEG streaming using Crow. I had an example working with flask and I tried to convert it to Crow. This is the code I add in flask :

from flask import Flask, Response, request

app = Flask(__name__)

# Buffer to hold the stream data
buffer = b""
buffer_limit = 1024 * 1024  # 1 MB buffer limit to avoid overflows

@app.route('/streaming/jpeg/test', methods=['GET'])
def index():
    # Serve a simple HTML page to display the video stream
    return '''
    <html>
        <head>
            <title>Video Stream</title>
        </head>
        <body>
            <h1>Live Stream</h1>
            <img src="/streaming/jpeg/test1" />
        </body>
    </html>
    '''

@app.route('/streaming/jpeg/test', methods=['PUT'])
def stream_input():
    global buffer
    new_data = request.data  # Capture the incoming stream from GStreamer

    # Limit the buffer size to prevent memory overflow
    if len(buffer) + len(new_data) > buffer_limit:
        buffer = buffer[len(new_data):]  # Trim buffer if it exceeds limit

    buffer += new_data  # Append new data to buffer
    return '', 200

@app.route('/streaming/jpeg/test1', methods=['GET'])
def stream_output():
    def generate():
        global buffer
        boundary = b"--frame\r\n"
        while True:
            if buffer:
                # Ensure the buffer contains a valid frame boundary before splitting
                if b'\r\n\r\n' in buffer:
                    frame, remaining_buffer = buffer.split(b'\r\n\r\n', 1)
                    buffer = remaining_buffer  # Reassign the remaining buffer
                    yield boundary
                    yield b"Content-Type: image/jpeg\r\n\r\n"  # JPEG frame header
                    yield frame  # Actual image frame
                    yield b"\r\n"  # End of frame boundary
                else:
                    continue  # Wait for a valid frame boundary to appear in the buffer
    return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

I am generating the image stream using GStreamer and my webcam :

gst-launch-1.0 v4l2src device=/dev/video0 ! image/jpeg,framerate=30/1,width=1280,height=720 ! jpegdec ! \
    videoconvert ! \
    videorate max-rate=30 ! \
    jpegenc ! \
    multipartmux boundary="frame" ! \
    souphttpclientsink location=http://localhost:5000/streaming/jpeg/test

I tried to implement the following in Crow :

crow::response html_streaming(const crow::request &req,
                              const std::string &name) {

  int fps = 5;
  auto body = crow::json::load(req.body);
  if (body && body.has("fps")) {
    fps = body["fps"].i();
  }

  auto it = stream_threads.find(name);
  if (it != stream_threads.end()) {
    StreamThread *thread = it->second.get();
    thread->add_transformed_streamjpeg_sink(fps);
  }
  // Construct the HTML content as a string
  std::string response_html =
      "<html><body style=\"width: 800px; height: 600px; overflow: hidden;\">"
      "<img src=\"/streaming/jpeg/" +
      name +
      "\">"
      "</body></html>";

  // Return the HTML content as a response with status 200 and Content-Type
  // "text/html"
  crow::response res(response_html);
  res.code = 200;
  res.set_header("Content-Type", "text/html");

  return res;
}

// Put route to receive stream data
crow::response put_streaming_jpeg(const crow::request &req,
                                  const std::string &name) {
  std::vector<uint8_t> new_data(req.body.begin(), req.body.end());
  jpegStreamManager.addData(name, new_data);
  return crow::response(200);
}

// Streaming route to output the stream as MJPEG
crow::response get_streaming_jpeg(const crow::request &req,
                                  const std::string &name) {
  crow::response res;
  res.set_header("Content-Type", "multipart/x-mixed-replace; boundary=frame");

  std::string boundary = "--frame\r\n";

  while (true) {
    if (jpegStreamManager.hasData(name)) {
      auto frame = jpegStreamManager.getFrame(name);
      if (!frame.empty()) {
        res.body += boundary;
        res.body += "Content-Type: image/jpeg\r\n\r\n";
        res.body.append(reinterpret_cast<const char *>(frame.data()),
                        frame.size());
        res.body += "\r\n";
      }
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(50));
  }

  return res;
}

// further down within the main : 
  CROW_ROUTE(app, "/html/streaming/<string>")
      .methods(crow::HTTPMethod::Get)(html_streaming);

  CROW_ROUTE(app, "/streaming/jpeg/<string>")
      .methods(crow::HTTPMethod::Put)(put_streaming_jpeg);

  CROW_ROUTE(app, "/streaming/jpeg/<string>")
      .methods(crow::HTTPMethod::Get)(get_streaming_jpeg);
  And finally this is what I use to manage buffers : 
#include "JpegStreamManager.hpp"
#include <algorithm>
#include <iostream>

JpegStreamManager::JpegStreamManager(size_t limit) : buffer_limit(limit) {}

JpegStreamManager::~JpegStreamManager() {
  for (auto &pair : stream_buffers) {
    pair.second.clear();
  }
}

void JpegStreamManager::addData(const std::string &stream_id,
                                const std::vector<uint8_t> &new_data) {
  std::lock_guard<std::mutex> lock(stream_mutexes[stream_id]);
  // Initialize buffer if it doesn't exist
  if (stream_buffers.find(stream_id) == stream_buffers.end()) {
    stream_buffers[stream_id] = std::vector<uint8_t>();
  }

  auto &buffer = stream_buffers[stream_id];

  // Ensure this buffer doesn't exceed the size limit
  if (buffer.size() + new_data.size() > buffer_limit) {
    std::cout << "Buffer overflow detected for stream " << stream_id
              << ", erasing..." << std::endl;
    size_t overflow_size = buffer.size() + new_data.size() - buffer_limit;
    buffer.erase(buffer.begin(), buffer.begin() + overflow_size);
  }

  buffer.insert(buffer.end(), new_data.begin(), new_data.end());
}

// Function to retrieve a frame from a specific stream
std::vector<uint8_t> JpegStreamManager::getFrame(const std::string &stream_id) {
  std::lock_guard<std::mutex> lock(stream_mutexes[stream_id]);
  if (stream_buffers.find(stream_id) == stream_buffers.end()) {
    std::cout << "Frame requested for non-existent stream " << stream_id
              << std::endl;
    return {};
  }

  auto &buffer = stream_buffers[stream_id];
  std::vector<uint8_t> frame;

  // Find the frame boundary
  auto it = std::search(buffer.begin(), buffer.end(), std::begin("\r\n\r\n"),
                        std::end("\r\n\r\n") - 1);
  if (it != buffer.end()) {
    frame.assign(buffer.begin(), it + 4);
    buffer.erase(buffer.begin(), it + 4);
  }
  return frame;
}

// Function to check if a stream has data
bool JpegStreamManager::hasData(const std::string &stream_id) {
  std::lock_guard<std::mutex> lock(stream_mutexes[stream_id]);
  return stream_buffers.find(stream_id) != stream_buffers.end() &&
         !stream_buffers[stream_id].empty();
}

I seem to not be able to load images, as the browser is constantly refreshing without showing any image. When I access http://localhost:5000/streaming/jpeg/test1 I get redirected to the index on /.

EDIT: I believe the problem lies here: Crow's HTTP response model is different from Flask's, and it requires that the response is returned and completed at the end of the route handler. Crow does not have a direct equivalent to Flask's yield, which allows for continuous response streaming.

Rz-Rz commented 1 month ago

I want to add the yield functionality to the webserver, any one can point me to where to start ?

The-EDev commented 1 week ago

Ok I see 2 main problems here, the first (and not critical one) is the fact that crow's handlers are blocking, meaning while you're waiting for a valid frame to come, you cannot do anything else with that thread, but Crow's async functionality is another massive can of worms i don't wanna open.

The second issue is the way crow handles http connections, basically it's read_request -> parse_request -> run_handler -> send_response -> repeat this means that the handler is simply a function you put between the read and write that takes a request and sends a response.

theoretically speaking, you would need to bypass this loop and use the connection to send the response data without triggering the read part of the loop.

IIRC I was working on something similar a long time ago but ultimately gave up, please check out the res-push branch.

Please keep in mind this was in my early days of working on Crow and I largely had no idea what I was doing.

I'd be happy to assist you in any way I can to get this feature going