alliedvision / VimbaPython

Old Allied Vision Vimba Python API. The successor to this API is VmbPy
BSD 2-Clause "Simplified" License
93 stars 40 forks source link

streaming asynchronous_ grab using flask #136

Open Lucas-Mantuan opened 1 year ago

Lucas-Mantuan commented 1 year ago

Hey, all.

I need to use asynchronous mode to streaming frames from camera using flask.

Below is my code, based on 'Examples/asynchronous_grab.py' and the flask code I got this reference: https://towardsdatascience.com/video-streaming-in-web-browsers-with-opencv-flask-93a38846fe00.

import sys
from typing import Optional, Tuple
from vimba import *

#Import necessary libraries
from flask import Flask, Response
import cv2
import threading  

#Initialize the Flask app
app = Flask(__name__)

def print_preamble():
    print('///////////////////////////////////////////')
    print('/// Vimba API Asynchronous Grab Example ///')
    print('///////////////////////////////////////////\n')

def print_usage():
    print('Usage:')
    print('    python asynchronous_grab.py [/x] [-x] [camera_id]')
    print('    python asynchronous_grab.py [/h] [-h]')
    print()
    print('Parameters:')
    print('    /x, -x      If set, use AllocAndAnnounce mode of buffer allocation')
    print('    camera_id   ID of the camera to use (using first camera if not specified)')
    print()

def abort(reason: str, return_code: int = 1, usage: bool = False):
    print(reason + '\n')

    if usage:
        print_usage()

    sys.exit(return_code)

def parse_args() -> Tuple[Optional[str], AllocationMode]:
    args = sys.argv[1:]
    argc = len(args)

    allocation_mode = AllocationMode.AnnounceFrame
    cam_id = ""
    for arg in args:
        if arg in ('/h', '-h'):
            print_usage()
            sys.exit(0)
        elif arg in ('/x', '-x'):
            allocation_mode = AllocationMode.AllocAndAnnounceFrame
        elif not cam_id:
            cam_id = arg

    if argc > 2:
        abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)

    return (cam_id if cam_id else None, allocation_mode)

def get_camera(camera_id: Optional[str]) -> Camera:
    with Vimba.get_instance() as vimba:
        if camera_id:
            try:
                return vimba.get_camera_by_id(camera_id)

            except VimbaCameraError:
                abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))

        else:
            cams = vimba.get_all_cameras()
            if not cams:
                abort('No Cameras accessible. Abort.')

            return cams[0]

def setup_camera(cam: Camera):
    with cam:
        # Enable auto exposure time setting if camera supports it
        try:
            cam.ExposureAuto.set('Continuous')

        except (AttributeError, VimbaFeatureError):
            pass

        # Enable white balancing if camera supports it
        try:
            cam.BalanceWhiteAuto.set('Continuous')

        except (AttributeError, VimbaFeatureError):
            pass

        # Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
        try:
            cam.GVSPAdjustPacketSize.run()

            while not cam.GVSPAdjustPacketSize.is_done():
                pass

        except (AttributeError, VimbaFeatureError):
            pass

        # Query available, open_cv compatible pixel formats
        # prefer color formats over monochrome formats
        cv_fmts = intersect_pixel_formats(cam.get_pixel_formats(), OPENCV_PIXEL_FORMATS)
        color_fmts = intersect_pixel_formats(cv_fmts, COLOR_PIXEL_FORMATS)

        if color_fmts:
            cam.set_pixel_format(color_fmts[0])

        else:
            mono_fmts = intersect_pixel_formats(cv_fmts, MONO_PIXEL_FORMATS)

            if mono_fmts:
                cam.set_pixel_format(mono_fmts[0])

            else:
                abort('Camera does not support a OpenCV compatible format natively. Abort')        

def gen_frames(frame):  
        print('hello')
        return (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

def frame_handler(cam: Camera, frame: Frame):
    if frame.get_status() == FrameStatus.Complete:
        print('{} acquired {}'.format(cam, frame), flush=True)

        # ===============
        # ===============
        img = frame.as_opencv_image()
        rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        ret, jpeg = cv2.imencode('.jpg', rgb_img)
        fr = jpeg.tobytes()

        gen_frames(fr)
        cam.queue_frame(frame)

        # ===============
        # ===============

def main():
    print_preamble()
    cam_id, allocation_mode = parse_args()

    with Vimba.get_instance() as vimba:
        cams = vimba.get_all_cameras()
        with cams[0] as cam:
            setup_camera(cam)
            print('Press <enter> to stop Frame acquisition.')

            try:
                # Start Streaming with a custom a buffer of 10 Frames (defaults to 5)
                cam.start_streaming(handler=frame_handler, buffer_count=10, allocation_mode=allocation_mode)
                input()

            finally:
                cam.stop_streaming()

@app.route('/video_feed')
def video_feed():
    return Response(frame_handler(), mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == '__main__':
    threading.Thread(target = main).start()
    threading.Thread(target = app.run(debug=True, host="0.0.0.0", port=8888)).start()

I'm facing this error when I open the local server, but I can't find out a way to resolve it. Hope somebody can help me with this issue.

image

Observations: I'm using a USB-camera model

Teresa-AlliedVision commented 1 year ago

Hello Lucas-Mantuan, my collegue checked your code and was able to execute it after modifying the line here; def video_feed(): return Response(frame_handler(), mimetype='multipart/x-mixed-replace; boundary=frame') You just need to give the frame_handler cam and frame as an argument and it should run. Cheers, Teresa