alliedvision / VimbaPython

Old Allied Vision Vimba Python API. The successor to this API is VmbPy
BSD 2-Clause "Simplified" License
93 stars 40 forks source link

Write video to file #154

Open Naomipython opened 1 year ago

Naomipython commented 1 year ago

Hello!

I am trying to write frames to a file, but it does not work. There are no error messages, but the file I write the frames to does not work. We are using the USB camera (1800 U-240c).

I used the Asynchronous Grab with OpenCV Example to start with, and modified it to write the file with open cv. When I run the script, it does show the current frames (although it is slow) but when I click on the file that was made (open with standard windows media player or similar), the media player gives an error message.

I am just a beginner with open cv and vimba, so I hope someone here knows more.

import threading
import numpy as np
import sys
import cv2
from typing import Optional
from vimba import *
from datetime import datetime
import time
from cv2 import VideoWriter
from cv2 import VideoWriter_fourcc
import pandas as pd

def print_preamble():
    print('///////////////////////////////////////////////////////')
    print('/// Vimba API Asynchronous Grab with OpenCV Example ///')
    print('///////////////////////////////////////////////////////\n')

def print_usage():
    print('Usage:')
    print('    python asynchronous_grab_opencv.py [camera_id]')
    print('    python asynchronous_grab_opencv.py [/h] [-h]')
    print()
    print('Parameters:')
    print('    camera_id   ID of the camera to use (using first camera if not specified)')
    print()

def abort(reason: str, return_code: int = 1, usage: bool = False):
    print(reason + '\n')

    if usage:
        print_usage()

    sys.exit(return_code)

def parse_args() -> Optional[str]:
    args = sys.argv[1:]
    argc = len(args)

    for arg in args:
        if arg in ('/h', '-h'):
            print_usage()
            sys.exit(0)

    if argc > 1:
        abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)

    return None if argc == 0 else args[0]

def get_camera(camera_id: Optional[str]) -> Camera:
    with Vimba.get_instance() as vimba:
        if camera_id:
            try:
                return vimba.get_camera_by_id(camera_id)

            except VimbaCameraError:
                abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))

        else:
            cams = vimba.get_all_cameras()
            if not cams:
                abort('No Cameras accessible. Abort.')

            return cams[0]

def setup_camera(cam: Camera):
    with cam:
        # Enable auto exposure time setting if camera supports it
        try:
            cam.ExposureAuto.set('Continuous')

        except (AttributeError, VimbaFeatureError):
            pass

        # Enable white balancing if camera supports it
        try:
            cam.BalanceWhiteAuto.set('Continuous')

        except (AttributeError, VimbaFeatureError):
            pass

        # Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
        try:
            cam.GVSPAdjustPacketSize.run()

            while not cam.GVSPAdjustPacketSize.is_done():
                pass

        except (AttributeError, VimbaFeatureError):
            pass

        # Query available, open_cv compatible pixel formats
        # prefer color formats over monochrome formats
        cv_fmts = intersect_pixel_formats(cam.get_pixel_formats(), OPENCV_PIXEL_FORMATS)
        color_fmts = intersect_pixel_formats(cv_fmts, COLOR_PIXEL_FORMATS)

        if color_fmts:
            cam.set_pixel_format(color_fmts[0])

        else:
            mono_fmts = intersect_pixel_formats(cv_fmts, MONO_PIXEL_FORMATS)

            if mono_fmts:
                cam.set_pixel_format(mono_fmts[0])
                print(cam.set_pixel_format(mono_fmts[0]))

            else:
                abort('Camera does not support a OpenCV compatible format natively. Abort.')

def main():
    print_preamble()

    with Vimba.get_instance() as vimba:
        cams = vimba.get_all_cameras()
        with cams[0] as cam:
            setup_camera(cam)
            video = VideoWriter('newvid.avi', VideoWriter_fourcc(*'MJPG'), 25.0, (1216, 1936))

            while True:
                frame = cam.get_frame()
                frame.convert_pixel_format(PixelFormat.Bgra8)
                frame = frame.as_opencv_image()

                cv2.imshow('show', frame)

                video.write(frame)
                # check if frame size is correct
                # newframe = np.reshape(np.array(frame), (frame.shape[0], frame.shape[1]))
                # DF = pd.DataFrame(newframe)
                # DF.to_csv('data1.csv')

                if cv2.waitKey(1) & 0xFF == ord('d'):
                    break

            cam.stop_streaming()
            cv2.destroyAllWindows()
            video.release()

if __name__ == '__main__':
    main()
arunprakash-avt commented 1 year ago

With respect tot the above issue I would request you to refer tot the below example code:

import sys import threading from typing import Optional from vimba import * import cv2 import queue

frame_queue = queue.Queue()

writer = None

def print_preamble(): print('///////////////////////////////////////////') print('/// Vimba API Asynchronous Grab Example ///') print('///////////////////////////////////////////\n')

def print_usage(): print('Usage:') print(' python asynchronous_grab.py [camera_id]') print(' python asynchronous_grab.py [/h] [-h]') print() print('Parameters:') print(' camera_id ID of the camera to use (using first camera if not specified)') print()

def abort(reason: str, return_code: int = 1, usage: bool = False): print(reason + '\n')

if usage:
    print_usage()

sys.exit(return_code)

def parse_args() -> Optional[str]: args = sys.argv[1:] argc = len(args)

for arg in args:
    if arg in ('/h', '-h'):
        print_usage()
        sys.exit(0)

if argc > 1:
    abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)

return None if argc == 0 else args[0]

def get_camera(camera_id: Optional[str]) -> Camera: with Vimba.get_instance() as vimba: if camera_id: try: return vimba.get_camera_by_id(camera_id)

        except VimbaCameraError:
            abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))

    else:
        cams = vimba.get_all_cameras()
        if not cams:
            abort('No Cameras accessible. Abort.')

        return cams[0]

def setup_camera(cam: Camera): with cam:

Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.

    try:
        cam.GVSPAdjustPacketSize.run()

        while not cam.GVSPAdjustPacketSize.is_done():
            pass

    except (AttributeError, VimbaFeatureError):
        pass
    # Set acquistion frame rate to 25 fps
    try:
        cam.TriggerSource.Set("FixedRate")
        cam.AcquisitionFrameRateAbs.Set(25)

    except (AttributeError, VimbaFeatureError):
        pass

def frame_handler(cam: Camera, frame: Frame): print('{} acquired {}'.format(cam, frame), flush=True)

global frame_queue
frame_queue.put(frame.as_opencv_image())
cv_image = frame.as_opencv_image()
cv2.imshow("Display", cv_image)
cv2.waitKey(1)
cam.queue_frame(frame)

def write_to_file(): global frame_queue global writer

while True:
    img = frame_queue.get()
    print("took image from queue")
    if writer is None:
        height, width = img.shape[0:2]
        size = (width, height)
        print(size)
        fourcc = cv2.VideoWriter_fourcc(*'MJPG')
        writer = cv2.VideoWriter('filename2.avi',
                                 fourcc,
                                 25,
                                 size)

    writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
    frame_queue.task_done()

def main(): print_preamble() cam_id = parse_args()

with Vimba.get_instance():
    with get_camera(cam_id) as cam:

        setup_camera(cam)
        print('Press <enter> to stop Frame acquisition.')

        try:
            # Start Streaming with a custom a buffer of 10 Frames (defaults to 5)
            threading.Thread(target=write_to_file, daemon=True).start()
            cam.start_streaming(handler=frame_handler, buffer_count=10)
            input()

        finally:
            cam.stop_streaming()
            frame_queue.join()
            writer.release()

if name == 'main': main()

aakash-sarin commented 11 months ago

Hi, Im hitting enter after the required number of frames have been captured but it returns an AttributeError: 'numpy.ndarray' object has no attribute 'join'. How do i resolve this?