alliedvision / VimbaPython

Old Allied Vision Vimba Python API. The successor to this API is VmbPy
BSD 2-Clause "Simplified" License
93 stars 40 forks source link

Writing to video file #83

Open nisckhan13 opened 2 years ago

nisckhan13 commented 2 years ago

Hi,

I am attempting to write my acquired frames straight to a video file. I followed the example in Issue #62. I was able to grab acquire and write 50 frames to disk as individual images.

Function to write images:

    while True:
        # Get an element from the queue.
        frame, id = frame_queue.get()
        cv2.imwrite(f'output/image_{id}.png', frame)
        # let the queue know we are finished with this element so the main thread can figure out
        # when the queue is finished completely
        frame_queue.task_done()

Main:

      num_pics = 50
      with vimba.Vimba.get_instance() as vmb:
          cams = vmb.get_all_cameras()

          frame_queue = queue.Queue()
          recorder = ImageRecorder(cam=cams[0], frame_queue=frame_queue)
          # Start a thread that runs write_image(frame_queue). Marking it as daemon allows the python
          # program to exit even though that thread is still running. The thread will then be stopped
          # when the program exits
          threading.Thread(target=functools.partial(write_image, frame_queue), daemon=True).start()

          recorder.record_images(num_pics=num_pics)
          frame_queue.join()

I then attempted to write a function to save the images from the queue into a video file using the following code:

Video Function:

    fourcc = cv2.VideoWriter_fourcc(*'H264')
    frontVid = cv2.VideoWriter('output/testVideo.avi',fourcc,10,(1456,1088))
    id = 0
    while id < 49:
        frame, id = frame_queue.get()
        frontVid.write(frame)
        frame_queue.task_done()
    frontVid.release

The video file does write to disk but seems to be corrupted and is unreadable (see attached file).

Do you have any advice on a better way to do this or if there is something wrong with my write video code? It might have something to do with the way I'm writing the frame to the video but I'm unsure. An alternative could be to just acquire as images and then compile the images into a video file after acquisition. However, I would like to write straight to video if possible.

The camera I am using is: Alvium 1800U-158m and I am using Vimba 5.0 on Python 3.9.8. The camera is set to record with an ROI of 544x728 at 200FPS. Thank you for your help!

https://user-images.githubusercontent.com/35070165/146064650-344191a9-ac56-4a9e-aee5-fdc37bc05e73.mp4

NiklasKroeger-AlliedVision commented 2 years ago

Your video function seems to set a different output resolution for the file than you say your camera is recording:

frontVid = cv2.VideoWriter('output/testVideo.avi',fourcc,10,(1456,1088)) [...] The camera is set to record with an ROI of 544x728

Below is an adjusted version of the code you referenced that records the camera frames straight to a video. Note that this video is MJPG encoded, not H264 as your code example attempts. I did this because on my Windows machine that I use for testing I do not have the necessary libraries for h264 encoding available. This may also be the reason, why your video file is not written properly. See for example the official OpenCV documentation for the VideoWriter class which mentions the used backends.

import functools
import queue
import threading
import time

import cv2
import numpy as np
import vimba

# writer is a global object to make it easier to properly close if after the writer thread is done.
# There are certainly better ways to do this!
writer = None

class ImageRecorder:
    def __init__(self, cam: vimba.Camera, frame_queue: queue.Queue):
        self._cam = cam
        self._queue = frame_queue

    def __call__(self, cam: vimba.Camera, frame: vimba.Frame):
        # Place the image data as an opencv image and the frame ID into the queue
        if frame.get_status() == vimba.FrameStatus.Complete:
            self._queue.put((frame.as_numpy_ndarray(), frame.get_id()))

        # Hand used frame back to Vimba so it can store the next image in this memory
        cam.queue_frame(frame)

    def _setup_software_triggering(self):
        # Always set the selector first so that folling features are applied correctly!
        self._cam.TriggerSelector.set('FrameStart')

        # optional in this example but good practice as it might be needed for hadware triggering
        self._cam.TriggerActivation.set('RisingEdge')

        # Make camera listen to Software trigger
        self._cam.TriggerSource.set('Software')
        self._cam.TriggerMode.set('On')

    def record_images(self, num_pics: int):
        # This method assumes software trigger is desired. Free run image acquisition would work
        # similarly to get higher fps from the camera
        with vimba.Vimba.get_instance():
            with self._cam:
                try:
                    self._setup_software_triggering()
                    self._cam.start_streaming(handler=self)
                    for i in range(num_pics):
                        print(i)
                        self._cam.TriggerSoftware.run()
                        # Delay between images can be adjusted or removed entirely
                        time.sleep(0.1)
                finally:
                    self._cam.stop_streaming()

def write_image(frame_queue: queue.Queue):
    global writer

    while True:
        img, id = frame_queue.get()
        print(f"took image {id} from queue")
        if writer is None:
            height, width = img.shape[0:2]
            size = (width, height)
            print(size)
            fourcc = cv2.VideoWriter_fourcc(*'MJPG')
            # Warning. Shape here is expected as (width, height) np shapes are usually (height, width)
            writer = cv2.VideoWriter('filename.avi',
                                     fourcc,
                                     25,
                                     size)
        writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
        frame_queue.task_done()

def main():
    num_pics = 25
    with vimba.Vimba.get_instance() as vmb:
        cams = vmb.get_all_cameras()

        frame_queue = queue.Queue()
        recorder = ImageRecorder(cam=cams[0], frame_queue=frame_queue)
        # Start a thread that runs write_image(frame_queue). Marking it as daemon allows the python
        # program to exit even though that thread is still running. The thread will then be stopped
        # when the program exits
        threading.Thread(target=functools.partial(write_image, frame_queue), daemon=True).start()

        recorder.record_images(num_pics=num_pics)
        frame_queue.join()
        # releaase the writer instance to finalize writing to disk
        writer.release()

if __name__ == "__main__":
    main()
nisckhan13 commented 2 years ago

Thank you so much for this response. I was able to get video streaming to work (installing the ffmpeg library was the solution for the video writing issue). I am now attempting to get higher fps recording with multiple cameras simultaneously with multi-threading. However, it seems that "vimba.Vimba.get_instance()" is a singleton and, due to this, my cameras are only recording sequentially (i.e. the second one starts recording once the first one is complete). My code is shown below. Is there a way to get asynchronous recording while using multithreading with this API? In my ideal case, I would like to record from 4 cameras at the same time.

import queue
import threading
import time

import cv2
import numpy as np
import vimba

from functools import partial
from typing import List

class ImageRecorder:
    #def __init__(self, cam: vimba.Camera, frame_queue: queue.Queue, outputName: str):
    def __init__(self, cam: vimba.Camera, outputName: str):
        self._cam = cam
        self._queue = None
        self._outputFileName = outputName
        self._done = True

    def __call__(self, cam: vimba.Camera, frame: vimba.Frame):
        # Place the image data as an opencv image and the frame ID into the queue
        if frame.get_status() == vimba.FrameStatus.Complete:
            self._queue.put((frame.as_numpy_ndarray(), frame.get_id()))

        # Hand used frame back to Vimba so it can store the next image in this memory
        cam.queue_frame(frame)

    def _setup_software_triggering(self):
        # Always set the selector first so that folling features are applied correctly!
        self._cam.TriggerSelector.set('FrameStart')

        # optional in this example but good practice as it might be needed for hadware triggering
        self._cam.TriggerActivation.set('RisingEdge')

        # Make camera listen to Software trigger
        self._cam.TriggerSource.set('Software')
        self._cam.TriggerMode.set('Off')        

    def record_image(self, time_record: int):
        self._done = False
        self._queue = queue.Queue()
        # This method assumes software trigger is desired. Free run image acquisition would work
        # similarly to get higher fps from the camera
        self._done = False
        writer_thread = threading.Thread(target=self.writer_func)
        with vimba.Vimba.get_instance() as vmb:
            print("vmb", vmb)
            with self._cam:
                try:       
                    self._setup_software_triggering()
                    self._cam.start_streaming(handler=self)
                    self._cam.TriggerSoftware.run()
                    writer_thread.start()
                    time.sleep(time_record)
                finally:
                    self._cam.stop_streaming()
                    self._done = True
                    self._queue.join()

    def writer_func(self):
        img, _ = self._queue.get(timeout=2)
        height, width = img.shape[0:2]
        size = (width, height)
        fourcc = cv2.VideoWriter_fourcc(*'H264')
        writer = cv2.VideoWriter(self._outputFileName,
                                 fourcc,
                                 200,
                                 size)
        writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
        self._queue.task_done()

        while not self._done or not self._queue.empty():
            img, _ = self._queue.get(timeout=1)
            writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
            self._queue.task_done()

def main():
    time_record = 5
    outputNames = ['one', 'two', 'three', 'four']
    print('111')
    with vimba.Vimba.get_instance() as vmb:
        cams = vmb.get_all_cameras()
        print('Cams initialized')
        threads = []
        recorders: List[ImageRecorder] = []
        for i, cam in enumerate(cams):
            print('Cam ' + str(i))

            recorder = ImageRecorder(cam, outputName =outputNames[i] + '.mp4')
            recorders.append(recorder)
            # Start a thread that runs record_images(time_record). Marking it as daemon allows the python
            # program to exit even though that thread is still running. The thread will then be stopped
            # when the program exits
            t = threading.Thread(target=partial(recorder.record_image, time_record), daemon=True)
            threads.append(t)
            t.start()
            # recorder.record_image(time_record=time_record)     

        for i, t in enumerate(threads):
            print("Joining thread ", i)
            t.join()

if __name__ == "__main__":
    main()
NiklasKroeger-AlliedVision commented 2 years ago

Sorry for the long delay!

it seems that 'vimba.Vimba.get_instance()' is a singleton and, due to this, my cameras are only recording sequentially

It is correct, that the Vimba.get_instance() method returns a singleton. But this should not cause any problems with opening multiple cameras at the same time and using them in parallel.

Unfortunately I was also not able to properly record multiple camera streams at the same time with your code. I ran into some queue.Empty exceptions and decided rather than fix those, I would suggest an alternative implementation based on my original ImageRecorder class. Below you can find a code snippet that works on my machine to record two cameras at the same time using the cameras free run mode (NOT using triggering). All required functionality is encapsulated in the VideoWriter class. It takes a Camera object, a filename and a record duration in seconds (defaults to 10 seconds) as parameters. I hope this is helpful. If you run into further problems, feel free to comment here again!

import queue
import threading
import time
import pathlib

import cv2
import vimba

class VideoRecorder:
    def __init__(self, cam: vimba.Camera, filename: pathlib.Path, duration: int = 10):
        self._cam = cam
        self._queue = queue.Queue()
        self._filename = filename
        self._record_duration = duration
        # _writer is instantiated in `write_video_file` because there we know the shape of the
        # frames that should be stored in the video file
        self._writer = None

    def __call__(self, cam: vimba.Camera, frame: vimba.Frame):
        # Place the image data as an opencv image and the frame ID into the queue
        if frame.get_status() == vimba.FrameStatus.Complete:
            self._queue.put((frame.as_numpy_ndarray(), frame.get_id()))

        # Hand used frame back to Vimba so it can store the next image in this memory
        cam.queue_frame(frame)

    def record(self):
        with vimba.Vimba.get_instance():
            with self._cam:
                try:
                    threading.Thread(target=self.write_video_file, daemon=True).start()
                    self._cam.start_streaming(handler=self)
                    print(f'{self._cam} is recording')
                    # Very crude way to get the recorder to record for a certain time
                    time.sleep(self._record_duration)
                finally:
                    self._cam.stop_streaming()
                    print(f'{self._cam} is done recording')
                    # Wait until all frames have been taken from the queue
                    self._queue.join()
                    # Close the writer instance that was created to ensure file is finished properly
                    self._writer.release()
                    print(f'{self._writer} is done writing to file')

    def write_video_file(self):
        while True:
            img, id = self._queue.get()
            if id % 100 == 0:
                print(f"took image {id} from queue")
            if self._writer is None:
                height, width = img.shape[0:2]
                fourcc = cv2.VideoWriter_fourcc(*'MJPG')
                # Warning. Shape here is expected as (width, height) np shapes are usually (height,
                # width)
                self._writer = cv2.VideoWriter(self._filename,
                                               fourcc,
                                               25,
                                               (width, height))
            self._writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
            self._queue.task_done()

def main():
    with vimba.Vimba.get_instance() as vmb:
        cams = vmb.get_all_cameras()
        recorders = [VideoRecorder(cam=cam, filename=f'{i}.avi') for i, cam in enumerate(cams)]
        print(f'prepared {len(recorders)} recorders: {list(map(str, recorders))}')
        # Prepare a thread that will run every recorders `record` method. That method blocks until
        # recording is finished
        workers = [threading.Thread(target=recorder.record) for recorder in recorders]
        for worker in workers:
            print(f'starting {worker}')
            worker.start()
        print('All workers running. Waiting for them to finish')
        for worker in workers:
            worker.join()
        print('All workers done')

if __name__ == "__main__":
    main()
inchinn1 commented 2 years ago

I have an Allied Vision Alvium 1800 U-158m and would like to trigger video recording periodically using the GPIO. The code you posted above works well for saving video, but I am unsure how to modify it to wait for a trigger. Trigger port is wired correctly as I can trigger individual frames using vimba viewer.

I tried modifying the record function to check if cam.is_streaming() is true but it seems '_cam.start_streaming' sets this true regardless of whether it is waiting for a trigger.

def record(self):
        with vimba.Vimba.get_instance():
            with self._cam:
                try:
                    threading.Thread(target=self.write_video_file, daemon=True).start()

                    self._cam.LineSelector.set('Line0')
                    self._cam.LineMode.set('Input')
                    self._cam.TriggerSource.set('Line0')
                    self._cam.TriggerSelector.set('FrameStart')
                    self._cam.TriggerActivation.set('RisingEdge');
                    self._cam.TriggerMode.set('On')
                    self._cam.AcquisitionMode.set('Continuous')
                    print(f'{self._cam} is witing for trigger')

                    self._cam.start_streaming(handler=self)
                    if self._cam.is_streaming():
                        print(f'{self._cam} is recording')
                        # Very crude way to get the recorder to record for a certain time
                        time.sleep(self._record_duration)
                finally:
                    self._cam.stop_streaming()
                    print(f'{self._cam} is done recording')
                    # Wait until all frames have been taken from the queue
                    self._queue.join()
                    # Close the writer instance that was created to ensure file is finished properly
                    self._writer.release()
                    print(f'{self._writer} is done writing to file')

Any help would be greatly appreciated

nisckhan13 commented 2 years ago

I actually have a similar question. I modified the code to allow for hardware triggering (I was previously software triggering). I'm using MATLAB connected to a BNC-2110 terminal block (https://www.ni.com/en-us/support/model.bnc-2110.html) to trigger the cameras via their GPIO headers (triggering each camera at 200Hz).

I have a strange issue where the code will function as expected if I send the hardware trigger to the cameras before starting recording on Python. All four cameras will record their respective videos for the specified time length and at the desired framerate.

However, if I start my Python code first and have the cameras wait for a trigger, only the first camera will record and the other cameras will just timeout with a queue is empty error.

My code is below. My main question is, how do I make the cameras wait/listen for a hardware trigger. I appreciate any help!

#settings
runtime=30
mouse_name='testmouse1'
runnum=12
import queue
import threading
import time

import cv2
import numpy as np
import vimba

from functools import partial
from typing import List
class ImageRecorder:
    def __init__(self, cam: vimba.Camera, outputName: str):
        self._cam = cam
        self._queue = None
        self._outputFileName = outputName
        self._done = True

    def __call__(self, cam: vimba.Camera, frame: vimba.Frame):
        # Place the image data as an opencv image and the frame ID into the queue
        if frame.get_status() == vimba.FrameStatus.Complete:
            self._queue.put((frame.as_numpy_ndarray(), frame.get_id()))

        # Hand used frame back to Vimba so it can store the next image in this memory
        cam.queue_frame(frame)

    def record_image(self, time_record: int):
        self._done = False
        self._queue = queue.Queue()
        # This method assumes software trigger is desired. Free run image acquisition would work
        # similarly to get higher fps from the camera
        writer_thread = threading.Thread(target=self.writer_func)
        with vimba.Vimba.get_instance():
            with self._cam:
                try:       
                    self._cam.start_streaming(handler=self)
                    writer_thread.start()
                    time.sleep(time_record)
                finally:
                    self._cam.stop_streaming()
                    self._done = True
                    self._queue.join()

    def writer_func(self):
        img, _ = self._queue.get(timeout=30)
        height, width = img.shape[0:2]
        size = (width, height)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(self._outputFileName,
                                 fourcc,
                                 200,
                                 size,0)
        writer.write(img)
        self._queue.task_done()

        while not self._done or not self._queue.empty():
            try:
                img, _ = self._queue.get(timeout=3)
                writer.write(img)
            except queue.Empty:
                pass
                print('Queue is empty.')
            else:
                self._queue.task_done()
def main():
    time_record = runtime
    outputNames=['_cam4','_cam5']
    for i in np.arange(len(outputNames)):
        outputNames[i]=  'D:\\Python\\PythonAcq_Live\\'  +  mouse_name + '\\' + 'run' + str(runnum) + outputNames[i]   

    with vimba.Vimba.get_instance() as vmb:
        cams = vmb.get_all_cameras()
        threads = []
        recorders: List[ImageRecorder] =  []
        for i, cam in enumerate(cams):
            print('Cam ' + str(i))
            recorder = ImageRecorder(cam, outputName =outputNames[i] + '.mp4')
            recorders.append(recorder)
            # Start a thread that runs record_images(time_record). Marking it as daemon allows the python
            # program to exit even though that thread is still running. The thread will then be stopped
            # when the program exits
            t = threading.Thread(target=partial(recorder.record_image, time_record), daemon=True)
            threads.append(t)
            t.start()     

        for i, t in enumerate(threads):
            print("Joining thread ", i)
            t.join()
if __name__ == "__main__":
    main()

I am still using Alvium 1800U-158m with Vimba 5.1 and recording at 800x800 at 200fps. Below are the trigger settings for the camera:

cam.TriggerSelector.set('FrameStart')
cam.TriggerMode.set('On')
cam.TriggerSource.set('Line0')
cam.TriggerActivation.set('RisingEdge')
bengroegerILK commented 5 months ago

Hello, I am working with a Alvium 1800U-2040m and I also want to create a video to detect time-dependent deformations of flexible structures. Due to the upgrade from Vimba to Vimba X by Allied Vision, the code of NiklasKroeger doesn't work properly. Instead of "import vimba" I have to use "import vmbpy". I changed this also in the script (vimba to vmbpy) but errors occur caused by unexpected arguments etc. Is there any updated script for the new module?

Any help would be greatly appreciated

P.S: I am a total beginner with Python API.