alliedvision / VmbPy

Python API of the Vimba X SDK
BSD 2-Clause "Simplified" License
22 stars 8 forks source link

Memory leak when restarting streaming #13

Closed emasty closed 1 year ago

emasty commented 1 year ago

Issue When calling camera.py#start_streaming, memory is consumed which is not released after calling camera.py#stop_streaming.

How to reproduce Run example asynchronous_grab_opencv.py with camera module 1800 U-2050m. Extend code to restart streaming after stopping multiple times. Every restart increases memory consumption.

Additional information Memory leak can be improved when using AllocationMode#AllocAndAnnounceFrame, but is still not completely resolved. Eventually, memory is not released after calling stop_streaming.

https://github.com/alliedvision/VmbPy/blob/48ad45529690291c095047c178d259e71b7e3f5e/Examples/asynchronous_grab_opencv.py#L191

NiklasKroeger-AlliedVision commented 1 year ago

When start_streaming is called, the requested number of frame buffers is allocated. Upon calling stop_streaming internal references to these frame buffers are removed. If the user still holds some references to the Frame instances that the API provides, these Frame instances (and their corresponding allocated memory buffers) will not be freed by the Python garbage collector.

I assume that what you are seeing is just the Python garbage collector being slow. The garbage collection is run periodically and memory is only freed when some conditions are fulfilled. You can force manual garbage collection by using the function gc.collect from the built-in gc module. If you execute this after calling stop_streaming this might free the allocated frame buffers (if there are no other references to the Frame objects that use them).

Unfortunately this ever growing memory behaviour is more or less normal for garbage collected languages as memory will only be released when the garbage collector frees it. If you have hard requirements regarding the predictability of memory consumption it might make sense to consider using one of our other APIs like VmbC, or VmbCPP. VmbCPP and VmbPy have been built to have very similar class structures so migrating an existing code base from VmbPy to VmbCPP should hopefully be rather simple.

emasty commented 1 year ago

Many thanks for the hint!

I can confirm that adding a manual trigger for the garbage collector after stop_streaming did clear the memory allocation and memory consumption is now relatively stable.

(I should have tried that before as I did try manually triggering gc in a different test)

Blauschirm commented 6 months ago

@NiklasKroeger-AlliedVision

Sorry for reusing an already closed issue, but I am investigating the exact same problem and reproduced it in the asynchronous_grab_opencv.py example aswell. The hint to gc.collect() already helped a bit, but I can still see increasing memory usage up until 40ish iterations of the in the example. I keep the python process running, but execute the example in a loop, grab exactly 30 images each time, join all three Threads and trigger gc.collect() before starting up all three threads again.

With the manual gc behaviour it is much more predictable but this steady increase form only 130MB to 600MB is still something I would like to prevent.

I recorded process memory usage after every iteration with psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2

grafik

NiklasKroeger-AlliedVision commented 6 months ago

@Blauschirm If you want to limit the amount of memory used by your Python process, my recommendation would be to not stop and start acquisition multiple times. VmbPy is not designed to take in any user-allocated frames when the stream is started, that means for every stream start you will see allocations for new frames. The growing memory consumption you see is the Python process not releasing memory back to the operating system.

With that in mind: you mention always recording 30 frames, stopping acquisition and starting it again. I assume that you are doing something with these 30 frames like processing them as a batch? If so, a better way to achieve your goal of recording a certain number of images, stopping acquisition so you have time to process, and restarting acquisition after you are done processing might be a different Acquisition Mode on your camera. I would recommend looking at the MultiFrame Acquisition Mode.

Unfortunately you do not mention the camera model you are using, but I will just link to our Alvium Feature reference which can be found here: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/various/features/Alvium_Features_Reference.pdf

In that document see the description for "AcquisitionMode" for some details of possible acquisition modes.

The MultiFrame mode I mentioned will basically work like this:

  1. Execute AcquisitionStart command
  2. Camera will record however many frames you set in the AcquisitionFrameCount feature
  3. Camera will automatically stop acquisition

If you want to record another "group" or "batch" of images, go back to step 1 and execute the AcquisitionStart feature again.

I have prepared a small code example of how a Python script could be written, that uses this acquisition mode to record images in "batches" and process them. This is one of many possible implementations, so do not feel obliged to follow that structure precisely. The example is written to be easy to follow and concise and depending on your task other architectures might suit your needs better.

import queue
import threading
import time

import vmbpy

IMAGE_BATCH_SIZE = 30

class Producer:
    '''Will record IMAGE_BATCH_SIZE frames and hold them in `output_queue`'''

    def __init__(self, cam: vmbpy.Camera) -> None:
        self.output_queue = queue.Queue(IMAGE_BATCH_SIZE)
        self.cam = cam

    def __call__(self, cam: vmbpy.Camera, stream: vmbpy.Stream, frame: vmbpy.Frame) -> None:
        '''Callback executed for every recorded frame'''
        # Just to indicate a new frame was recorded
        print('.', end='', flush=True)
        try:
            self.output_queue.put_nowait(frame)
        except queue.Full:
            print(f'The Producer output Queue is full! Frame {frame} is not added to it.')
            cam.queue_frame(frame)

    def requeue_frame(self, frame: vmbpy.Frame) -> None:
        '''Return the processed frame to VmbPy for future acquisitions'''
        self.output_queue.task_done()
        self.cam.queue_frame(frame)

    def record_new_batch(self) -> None:
        '''Execute the AcquisitionStart camera feature to start recording a new image batch'''
        self.cam.AcquisitionStart.run()

class Consumer:
    '''Takes all recorded frames from the Producer and does some processing with them'''

    def __init__(self, producer: Producer) -> None:
        self.producer = producer
        self.should_stop = threading.Event()
        self.batch = 0

    def run(self) -> None:
        '''Consume frames from producer. Can be run in separate thread'''
        while not self.should_stop.is_set():
            while self.producer.output_queue.qsize() < IMAGE_BATCH_SIZE:
                # Wait until the queue holds a full batch
                time.sleep(1)
            print(f'\n========== Processing batch {self.batch:03} ==========')
            while self.producer.output_queue.qsize() > 0:
                # Consume all frames
                try:
                    frame = self.producer.output_queue.get_nowait()
                    print(
                        f'Consuming frame {frame}... {frame.as_numpy_ndarray().mean():.10} ', end='')
                    time.sleep(0.1)  # Simulating actual processing work for every frame
                    print('Done!')
                    # Remember to requeue the consumed frame for future transmissions!
                    self.producer.requeue_frame(frame)
                except queue.Empty:
                    # Queue is empty. Stop consuming and wait for queue to fill up again
                    break
            # We are done processing a batch. Request producer to record a new one
            print(f'========== Done with batch {self.batch:03}. Recording new one ==========')
            self.batch += 1
            self.producer.record_new_batch()

if __name__ == '__main__':
    with vmbpy.VmbSystem.get_instance() as vmb:
        with vmb.get_all_cameras()[0] as cam:
            # Set up camera to use MultiFrame acquisition
            cam.AcquisitionMode.set('MultiFrame')
            cam.AcquisitionFrameCount.set(IMAGE_BATCH_SIZE)
            producer = Producer(cam)
            consumer = Consumer(producer)
            # This way we tell VmbPy to allocate the exact number of frames we want to use as one
            # image batch. We can record that many images, take some time to process the batch, hand
            # the processed frame buffers back to VmbPy and tell the device to record the next batch
            try:
                cam.start_streaming(producer, buffer_count=IMAGE_BATCH_SIZE)
                consumer_thread = threading.Thread(target=consumer.run)
                consumer_thread.start()
                input('<Press Enter to stop>\n')
                consumer.should_stop.set()
                producer.output_queue.join()
                consumer_thread.join()
            finally:
                cam.stop_streaming()

With this I do not get any unwanted allocations on my system and the memory consumption is totally flat across recorded and processed batches. The screenshot below also nicely shows the bursts in IO while new batches are recorded and the pause during which the frames are processed (or more accurately the sleep in the example above 😉)

Screenshot taken from Process Explorer to get a quick idea of memory usage image

Please be aware that if you for example transform the recorded image to a different pixel format, this might introduce allocations again leading to different behaviour on your system.

If you need further assistance please provide a code snippet and some more system information (OS, used camera, used software/firmware versions).

Blauschirm commented 5 months ago

Thank you so much for your reply. Sorry for tagging you personally, I just wasn't sure if commenting on a closed issue would be noticed otherwise.

In our usecase we need to stream at 5fps to 25fps for 20min to an hour or longer, so we definitly need continous aquisition.

I am in contact with Allied Visions Support team now, which ran my script (with the gc.collect()) aswell and got this result: grafik

I ran the same test on my arm64 laptop and got much smaller memory usage even though it still seems to creep up slowly. grafik

On the Jetson Orin the change vom vmbpy 1.0.2 to 1.0.4 did not change anything and I still get massive memory usage (as in the previous picture).

All three system show some slow memory creep though.

I will work with your support team over mail and update this issue with any progress.

My modified example of yours:

"""BSD 2-Clause License

Copyright (c) 2022, Allied Vision Technologies GmbH
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
   list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice,
   this list of conditions and the following disclaimer in the documentation
   and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import copy
import queue
import threading
from typing import Optional

import cv2
import gc
import numpy
import traceback

from vmbpy import *
from random import randint

FRAME_QUEUE_SIZE = 10
FRAME_HEIGHT = 1944
FRAME_WIDTH = 2584

def print_preamble():
    print('////////////////////////////////////////')
    print('/// VmbPy Multithreading Example ///////')
    print('////////////////////////////////////////\n')
    print(flush=True)

def add_camera_id(frame: Frame, cam_id: str) -> Frame:
    # Helper function inserting 'cam_id' into given frame. This function
    # manipulates the original image buffer inside frame object.
    cv2.putText(frame.as_opencv_image(), 'Cam: {}'.format(cam_id), org=(0, 30), fontScale=1,
                color=255, thickness=1, fontFace=cv2.FONT_HERSHEY_COMPLEX_SMALL)
    return frame

def resize_if_required(frame: Frame) -> numpy.ndarray:
    # Helper function resizing the given frame, if it has not the required dimensions.
    # On resizing, the image data is copied and resized, the image inside the frame object
    # is untouched.
    cv_frame = frame.as_opencv_image()

    if (frame.get_height() != FRAME_HEIGHT) or (frame.get_width() != FRAME_WIDTH):
        cv_frame = cv2.resize(cv_frame, (FRAME_WIDTH, FRAME_HEIGHT), interpolation=cv2.INTER_AREA)
        cv_frame = cv_frame[..., numpy.newaxis]

    return cv_frame

def create_dummy_frame() -> numpy.ndarray:
    cv_frame = numpy.zeros((50, 640, 1), numpy.uint8)
    cv_frame[:] = 0

    cv2.putText(cv_frame, 'No Stream available. Please connect a Camera.', org=(30, 30),
                fontScale=1, color=255, thickness=1, fontFace=cv2.FONT_HERSHEY_COMPLEX_SMALL)

    return cv_frame

def try_put_frame(q: queue.Queue, cam: Camera, frame: Optional[Frame]):
    try:
        q.put_nowait((cam.get_id(), frame))

    except queue.Full:
        pass

def set_nearest_value(cam: Camera, feat_name: str, feat_value: int):
    # Helper function that tries to set a given value. If setting of the initial value failed
    # it calculates the nearest valid value and sets the result. This function is intended to
    # be used with Height and Width Features because not all Cameras allow the same values
    # for height and width.
    feat = cam.get_feature_by_name(feat_name)

    try:
        feat.set(feat_value)

    except VmbFeatureError:
        min_, max_ = feat.get_range()
        inc = feat.get_increment()

        if feat_value <= min_:
            val = min_

        elif feat_value >= max_:
            val = max_

        else:
            val = (((feat_value - min_) // inc) * inc) + min_

        try:
            feat.set(val)
        except:
            pass

        msg = ('Camera {}: Failed to set value of Feature \'{}\' to \'{}\': '
               'Using nearest valid value \'{}\'. Note that, this causes resizing '
               'during processing, reducing the frame rate.')
        Log.get_instance().info(msg.format(cam.get_id(), feat_name, feat_value, val))

# Thread Objects
class FrameProducer(threading.Thread):
    def __init__(self, cam: Camera, frame_queue: queue.Queue):
        threading.Thread.__init__(self, name=f"FrameProducer{randint(1,100)}")

        self.log = Log.get_instance()
        self.cam = cam
        self.frame_queue = frame_queue
        self.killswitch = threading.Event()

    def __call__(self, cam: Camera, stream: Stream, frame: Frame):
        # This method is executed within VmbC context. All incoming frames
        # are reused for later frame acquisition. If a frame shall be queued, the
        # frame must be copied and the copy must be sent, otherwise the acquired
        # frame will be overridden as soon as the frame is reused.
        if frame.get_status() == FrameStatus.Complete:

            if not self.frame_queue.full():
                frame_cpy = copy.deepcopy(frame)
                try_put_frame(self.frame_queue, cam, frame_cpy)

        cam.queue_frame(frame)

    def stop(self):
        self.killswitch.set()

    def setup_camera(self):
        set_nearest_value(self.cam, 'Height', FRAME_HEIGHT)
        set_nearest_value(self.cam, 'Width', FRAME_WIDTH)

        # Try to enable automatic exposure time setting
        try:
            self.cam.ExposureAuto.set('Once')

        except (AttributeError, VmbFeatureError):
            self.log.info('Camera {}: Failed to set Feature \'ExposureAuto\'.'.format(
                          self.cam.get_id()))

        try:
            self.cam.set_pixel_format(PixelFormat.Rgb8)
        except:
            pass

    def run(self):
        self.log.info('Thread \'FrameProducer({})\' started.'.format(self.cam.get_id()))

        try:
            with self.cam:
                self.setup_camera()

                try:
                    self.cam.start_streaming(self)
                    self.killswitch.wait()

                finally:
                    self.cam.stop_streaming()

        except VmbCameraError as e:
            traceback.print_exc()
            print("failed to start or stop Stream")

        finally:
            try_put_frame(self.frame_queue, self.cam, None)

        self.log.info('Thread \'FrameProducer({})\' terminated.'.format(self.cam.get_id()))

class FrameConsumer(threading.Thread):
    def __init__(self, frame_queue: queue.Queue):
        name = f"FrameConsumer{randint(1,100)}"
        threading.Thread.__init__(self, name=name)
        self.name = name
        self.log = Log.get_instance()
        self.frame_queue = frame_queue

    def run(self):
        IMAGE_CAPTION = 'Multithreading Example: Press <Enter> to exit'
        KEY_CODE_ENTER = 13

        frames = {}
        alive = True
        frames_received = 0

        self.log.info('Thread \'FrameConsumer\' started.')
        while alive:
            # Update current state by dequeuing all currently available frames.
            frames_left = self.frame_queue.qsize()
            while frames_left:
                try:
                    cam_id, frame = self.frame_queue.get_nowait()

                except queue.Empty:
                    break

                # Add/Remove frame from current state.
                if frame:
                    frames[cam_id] = frame
                    frames_received += 1

                else:
                    frames.pop(cam_id, None)

                frames_left -= 1

            if frames_received >= 50:

                print("grabbed 50 frames with shape: ")
                print(frame.as_numpy_ndarray().shape)
                break
            else:
                continue

        self.log.info('Thread \'FrameConsumer\' terminated.')

class MainThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self, name=f"MainThread{randint(1,100)}")

        self.frame_queue = queue.Queue(maxsize=FRAME_QUEUE_SIZE)
        self.producers = {}
        self.producers_lock = threading.Lock()

    def __call__(self, cam: Camera, event: CameraEvent):
        # New camera was detected. Create FrameProducer, add it to active FrameProducers
        if event == CameraEvent.Detected:
            with self.producers_lock:
                self.producers[cam.get_id()] = FrameProducer(cam, self.frame_queue)
                self.producers[cam.get_id()].start()

        # An existing camera was disconnected, stop associated FrameProducer.
        elif event == CameraEvent.Missing:
            with self.producers_lock:
                producer = self.producers.pop(cam.get_id())
                producer.stop()
                producer.join()

    def run(self):
        log = Log.get_instance()
        consumer = FrameConsumer(self.frame_queue)

        vmb = VmbSystem.get_instance()
        vmb.enable_log(LOG_CONFIG_INFO_CONSOLE_ONLY)

        log.info('Thread \'MainThread\' started.')

        with vmb:
            # Construct FrameProducer threads for all detected cameras
            for cam in vmb.get_all_cameras():
                self.producers[cam.get_id()] = FrameProducer(cam, self.frame_queue)

            # Start FrameProducer threads
            with self.producers_lock:
                for producer in self.producers.values():
                    producer.start()

            # Start and wait for consumer to terminate
            vmb.register_camera_change_handler(self)
            consumer.start()
            consumer.join()
            vmb.unregister_camera_change_handler(self)

            # Stop all FrameProducer threads
            with self.producers_lock:
                # Initiate concurrent shutdown
                for producer in self.producers.values():
                    producer.stop()

                # Wait for shutdown to complete
                for producer in self.producers.values():
                    producer.join()

        log.info('Thread \'MainThread\' terminated.')

def main():
    main = MainThread()
    main.start()
    main.join()
    gc.collect()
    time.sleep(0.1)

if __name__ == '__main__':

    print_preamble()

    import psutil, os, time

    log =  ["Wiederholung; Speicherbedarf in MB"]

    for i in range(50):
        main()
        print(f"Memory Usage after iteration {i}:")
        after_iteration = psutil.Process(
            os.getpid()).memory_info().rss / 1024 ** 2
        print(f"Self: {after_iteration} MB")
        log.append(f"{i};{after_iteration}")

    for line in log:
        print(line)
Teresa-AlliedVision commented 5 months ago

Hello @Blauschirm for you and the other users, I will summarize the explanation of the behaviour. From our experience, this is regular Python behaviour, where the memory "freed" by the garbage collector is not actually returned to the OS, because the program might need it again right away. The typical sign of that in the memory use (as shown by the OS) is steady increase until a sharp drop with another steady increase to a point above the previous maximum usually, followed by a sharp drop, where the memory is actually returned to the OS. And repeat.

Aside from using a different API that let's you manage the memory manually, like the C++ or C API, these are some approaches you can take when using the Pyhton APIs: 1) Only use cam.start_streaming() for the first stream to initialize the frame queue and cam.stop_streaming() at the end of the last stream to free the frame queue. In between use the camera features AcquisitionStop andAcquisitionStartto stop and restart the streams. 2) If you only need individual frames or a few at a time, configure the camera for a software trigger, start the stream with start_streaming() and then send software triggers for each frame needed. 3) Depending on the application and restraints there are other ways to implement frequent intermitted streaming on systems with limited memory: E.g. Hardware trigger in various configurations, timers in combination with counters, for complicated setups even sequencers sets.