thongtruongvietsol / yolov7_int8

0 stars 0 forks source link

fix #3

Open thongtruongvietsol opened 3 months ago

thongtruongvietsol commented 3 months ago

import collections import time from IPython import display import cv2 import threading

Main processing function to run object detection.

def run_object_detection( source=0, flip=False, use_popup=False, skip_first_frames=0, model= "model/yolov7-tiny_int8.xml", device=device.value, ): player = None compiled_model = core.compile_model(model, device) try:

Create a video player to play with target fps.

    player = VideoPlayer(source=source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
    # Start capturing.
    player.start()
    if use_popup:
        title = "Press ESC to Exit"
        cv2.namedWindow(winname=title, flags=cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

    processing_times = collections.deque()
    while True:
        # Grab the frame.
        frame = player.next()
        if frame is None:
            print("Source ended")
            break
        # If the frame is larger than full HD, reduce size to improve the performance.
        scale = 1280 / max(frame.shape)
        if scale < 1:
            frame = cv2.resize(
                src=frame,
                dsize=None,
                fx=scale,
                fy=scale,
                interpolation=cv2.INTER_AREA,
            )
        # Get the results.
        input_image = np.array(frame)

        start_time = time.time()
        # model expects RGB image, while video capturing in BGR
        detections, _, input_shape = detect(compiled_model, input_image[:, :, ::-1])
        stop_time = time.time()

        image_with_boxes = draw_boxes(detections[0], input_shape, input_image, DEFAULT_NAMES, COLORS)
        frame = image_with_boxes

        processing_times.append(stop_time - start_time)
        # Use processing times from last 200 frames.
        if len(processing_times) > 200:
            processing_times.popleft()

        _, f_width = frame.shape[:2]
        # Mean processing time [ms].
        processing_time = np.mean(processing_times) * 1000
        fps = 1000 / processing_time
        cv2.putText(
            img=frame,
            text=f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)",
            org=(20, 40),
            fontFace=cv2.FONT_HERSHEY_COMPLEX,
            fontScale=f_width / 1000,
            color=(0, 0, 255),
            thickness=1,
            lineType=cv2.LINE_AA,
        )
        # Use this workaround if there is flickering.
        if use_popup:
            cv2.imshow(winname=title, mat=frame)
            key = cv2.waitKey(1)
            # escape = 27
            if key == 27:
                break
        else:
            # Encode numpy array to jpg.
            _, encoded_img = cv2.imencode(ext=".jpg", img=frame, params=[cv2.IMWRITE_JPEG_QUALITY, 100])
            # Create an IPython image.⬆️
            i = display.Image(data=encoded_img)
            # Display the image in this notebook.
            display.clear_output(wait=True)
            display.display(i)
# ctrl-c
except KeyboardInterrupt:
    print("Interrupted")
# any different error
except RuntimeError as e:
    print(e)
finally:
    if player is not None:
        # Stop capturing.
        player.stop()
    if use_popup:
        cv2.destroyAllWindows()

class VideoPlayer: """ Custom video player to fulfill FPS requirements. You can set target FPS and output size, flip the video horizontally or skip first N frames.

:param source: Video source. It could be either camera device or video file.
:param size: Output frame size.
:param flip: Flip source horizontally.
:param fps: Target FPS.
:param skip_first_frames: Skip first N frames.
"""

def __init__(self, source, size=None, flip=False, fps=None, skip_first_frames=0):
    import cv2

    self.cv2 = cv2  # This is done to access the package in class methods
    self.__cap = cv2.VideoCapture(source)
    if not self.__cap.isOpened():
        raise RuntimeError(f"Cannot open {'camera' if isinstance(source, int) else ''} {source}")
    # skip first N frames
    self.__cap.set(cv2.CAP_PROP_POS_FRAMES, skip_first_frames)
    # fps of input file
    self.__input_fps = self.__cap.get(cv2.CAP_PROP_FPS)
    if self.__input_fps <= 0:
        self.__input_fps = 60
    # target fps given by user
    self.__output_fps = fps if fps is not None else self.__input_fps
    self.__flip = flip
    self.__size = None
    self.__interpolation = None
    if size is not None:
        self.__size = size
        # AREA better for shrinking, LINEAR better for enlarging
        self.__interpolation = cv2.INTER_AREA if size[0] < self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH) else cv2.INTER_LINEAR
    # first frame
    _, self.__frame = self.__cap.read()
    self.__lock = threading.Lock()
    self.__thread = None
    self.__stop = False

"""
Start playing.
"""

def start(self):
    self.__stop = False
    self.__thread = threading.Thread(target=self.__run, daemon=True)
    self.__thread.start()

"""
Stop playing and release resources.
"""

def stop(self):
    self.__stop = True
    if self.__thread is not None:
        self.__thread.join()
    self.__cap.release()

def __run(self):
    prev_time = 0
    while not self.__stop:
        t1 = time.time()
        ret, frame = self.__cap.read()
        if not ret:
            break

        # fulfill target fps
        if 1 / self.__output_fps < time.time() - prev_time:
            prev_time = time.time()
            # replace by current frame
            with self.__lock:
                self.__frame = frame

        t2 = time.time()
        # time to wait [s] to fulfill input fps
        wait_time = 1 / self.__input_fps - (t2 - t1)
        # wait until
        time.sleep(max(0, wait_time))

    self.__frame = None

"""
Get current frame.
"""

def next(self):
    import cv2

    with self.__lock:
        if self.__frame is None:
            return None
        # need to copy frame, because can be cached and reused if fps is low
        frame = self.__frame.copy()
    if self.__size is not None:
        frame = self.cv2.resize(frame, self.__size, interpolation=self.__interpolation)
    if self.__flip:
        frame = self.cv2.flip(frame, 1)
    return frame
thongtruongvietsol commented 3 months ago

` class VideoPlayer: def init(self, source, size=None, flip=False, fps=None, skip_first_frames=0): import cv2 self.cv2 = cv2 # This is done to access the package in class methods self.cap = cv2.VideoCapture(source) if not self.cap.isOpened(): raise RuntimeError(f"Cannot open {'camera' if isinstance(source, int) else ''} {source}")

skip first N frames

    self.__cap.set(cv2.CAP_PROP_POS_FRAMES, skip_first_frames)
    # fps of input file
    self.__input_fps = self.__cap.get(cv2.CAP_PROP_FPS)
    if self.__input_fps <= 0:
        self.__input_fps = 60
    # target fps given by user
    self.__output_fps = fps if fps is not None else self.__input_fps
    self.__flip = flip
    self.__size = None
    self.__interpolation = None
    if size is not None:
        self.__size = size
        # AREA better for shrinking, LINEAR better for enlarging
        self.__interpolation = cv2.INTER_AREA if size[0] < self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH) else cv2.INTER_LINEAR
    # first frame
    _, self.__frame = self.__cap.read()
    self.__lock = threading.Lock()
    self.__thread = None
    self.__stop = False
"""
Start playing.
"""
def start(self):
    self.__stop = False
    self.__thread = threading.Thread(target=self.__run, daemon=True)
    self.__thread.start()
"""
Stop playing and release resources.
"""
def stop(self):
    self.__stop = True
    if self.__thread is not None:
        self.__thread.join()
    self.__cap.release()
def __run(self):
    prev_time = 0
    while not self.__stop:
        t1 = time.time()
        ret, frame = self.__cap.read()
        if not ret:
            break
        # fulfill target fps
        if 1 / self.__output_fps < time.time() - prev_time:
            prev_time = time.time()
            # replace by current frame
            with self.__lock:
                self.__frame = frame
        t2 = time.time()
        # time to wait [s] to fulfill input fps
        wait_time = 1 / self.__input_fps - (t2 - t1)
        # wait until
        time.sleep(max(0, wait_time))

    self.__frame = None

"""
Get current frame.
"""

def next(self):
    import cv2

    with self.__lock:
        if self.__frame is None:
            return None
        # need to copy frame, because can be cached and reused if fps is low
        frame = self.__frame.copy()
    if self.__size is not None:
        frame = self.cv2.resize(frame, self.__size, interpolation=self.__interpolation)
    if self.__flip:
        frame = self.cv2.flip(frame, 1)
    return frame

`