bml1g12 / benchmarking_multiprocessing_np_array_comms

Benchmarking communication of numpy arrays between Python processes
MIT License
21 stars 5 forks source link

Black screen while streaming video file #4

Closed f1fan183 closed 3 years ago

f1fan183 commented 3 years ago

I've applied your shared memory approach to a single camera scenario (see code below). Unfortunately, all I see is a black screen. Any thoughts on what might be causing this issue?

import multiprocessing as mp import cv2 import numpy as np import sys import ctypes

IMAGE_DIMENSIONS = (512, 640, 3) VIDEO_STREAM = "C:\Users\japje\Videos\2_2021-04-30_18-25-23_896_trim.mp4"

def frame_stream(data_tuple, video_input): queue, mp_array, np_array = data_tuple frames_written = 0 cap = cv2.VideoCapture(video_input, cv2.CAP_FFMPEG) while cap.isOpened(): ret, frame = cap.read() if ret: mp_array.acquire() np_array[:] = frame queue.put(frames_written) frames_written += 1 else: break cv2.destroyAllWindows() print("Finished streaming video: " + video_input) exit(0)

def display_frame_from_camera(data_tuple): queue, mp_array, np_array = data_tuple frame_num = queue.get() # get the frame metadata print(frame_num) img = np_array.astype("uint8").copy() cv2.imshow("img", img) k = cv2.waitKey(1) if k == ord("q"): cv2.destroyAllWindows() sys.exit() mp_array.release()

def setup_mp_resources(): queue = mp.Queue() mp_array = mp.Array(ctypes.c_uint8, int(np.prod(IMAGE_DIMENSIONS)), lock=mp.Lock()) np_array = np.frombuffer(mp_array.get_obj(), dtype=np.uint8).reshape(IMAGE_DIMENSIONS) data_tuple = (queue, mp_array, np_array) proc = mp.Process(target=frame_stream, args=(data_tuple,VIDEO_STREAM)) return proc, data_tuple

if name == "main": proc, data_tuple = setup_mp_resources() proc.start() while True: display_frame_from_camera(data_tuple)

bml1g12 commented 3 years ago

Please provide your code with indentation in a code block and I can take a look. https://docs.github.com/en/github/writing-on-github/working-with-advanced-formatting/creating-and-highlighting-code-blocks

bml1g12 commented 3 years ago

Put I think I can see the problem; the mp_array.release() needs to be called after you have done processing your frame. In this case, processing of the frame is essentially just the cv2.imshow("img", img) operation.

The reason for this is because in this implementation the "release" operation is what allows the process to add a new frame; it is locked until this operation is applied (mp_array.acquire()). This is to prevent the possibility of the process writing to np_array whilst it is being used.

The following works:

import multiprocessing as mp
import cv2
import numpy as np
import sys
import ctypes

IMAGE_DIMENSIONS = (480, 1440, 3)
VIDEO_STREAM = "/data/gym_videos/test.mp4"

def frame_stream(data_tuple, video_input):
    queue, mp_array, np_array = data_tuple
    frames_written = 0
    cap = cv2.VideoCapture(video_input, cv2.CAP_FFMPEG)
    while cap.isOpened():
        ret, frame = cap.read()
        if ret:
            mp_array.acquire()
            np_array[:] = frame
            print("putting frame")
            queue.put(frames_written)
            print("put frame")
            frames_written += 1
        else:
            print("breaking")
            # cv2.destroyAllWindows()
            # print("Finished streaming video: " + video_input)
            # exit(0)
            break

def display_frame_from_camera(data_tuple):
    queue, mp_array, np_array = data_tuple
    print("getting frame")
    frame_num = queue.get() # get the frame metadata
    print("Got frame", frame_num)
    img = np_array.astype("uint8").copy()
    cv2.imshow("img", img)
    mp_array.release()
    k = cv2.waitKey(1)
    if k == ord("q"):
        cv2.destroyAllWindows()
        sys.exit()

def setup_mp_resources():
    queue = mp.Queue()
    mp_array = mp.Array(ctypes.c_uint8, int(np.prod(IMAGE_DIMENSIONS)), lock=mp.Lock())
    np_array = np.frombuffer(mp_array.get_obj(), dtype=np.uint8).reshape(IMAGE_DIMENSIONS)
    data_tuple = (queue, mp_array, np_array)
    proc = mp.Process(target=frame_stream, args=(data_tuple,VIDEO_STREAM))
    print("created proc")
    return proc, data_tuple

if __name__ == "__main__":
    proc, data_tuple = setup_mp_resources()
    proc.start()
    while True:
        display_frame_from_camera(data_tuple)
f1fan183 commented 3 years ago

Capture

My apologizes for the improper formatting. I didn't notice that at the time of posting the issue.

I tested the code changes, but they didn't work. See attached JPEG confirming the black screen.

I tried a slightly different approach with shared memory (available in python > 3.8) and that seems to work. The downside is that there are no explicit locks. Also, this only works on Linux due to the use of the posix_ipc library, which I was unable to compile on Windows.



from multiprocessing import shared_memory

import cv2
import numpy as np
import multiprocessing as mp
import time
import os

VIDEO_DIMENSIONS = (512, 640, 3)
SHOW_SOURCE = False
LOAD_RTSP = True

def stream_video():
    if LOAD_RTSP:
        os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp | fflags;nobuffer | flag;low_delay"
        video_path = "rtsp://192.168.0.30:554/ch1"
    else:
        video_path = "2_2021-01-28_13-03-34_877_Trim_masked.mp4"
    video_capture = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG)

    shm = shared_memory.SharedMemory(name="video_stream", create=True, size=int(np.prod(VIDEO_DIMENSIONS)))

    shared_image = np.ndarray(VIDEO_DIMENSIONS, dtype=np.uint8, buffer=shm.buf)

    while video_capture.isOpened():
        return_value, frame = video_capture.read()
        fps = video_capture.get(cv2.CAP_PROP_FPS)
        delay = int((1/fps)*1000)

        if (return_value):
            shared_image[:] = frame
            if SHOW_SOURCE:
                cv2.imshow('stream_video', shared_image)
                if (cv2.waitKey(1) & 0xFF == ord('q')):
                    break
            if not LOAD_RTSP: 
                time.sleep(delay/1000)
        else:
            print("Finished streaming " + video_path)
            break

    video_capture.release()
    if SHOW_SOURCE: 
        cv2.destroyAllWindows()
    shm.close()
    shm.unlink()

if __name__ == "__main__":
    proc = mp.Process(target=stream_video)
    proc.start()

consumer.py


import cv2
import numpy as np
import multiprocessing as mp
import mmap
import posix_ipc

VIDEO_DIMENSIONS = (512, 640, 3)

def read_video():
    shm = posix_ipc.SharedMemory('/video_stream')
    mapfile = mmap.mmap(shm.fd, shm.size)

    shared_image = np.ndarray(VIDEO_DIMENSIONS, dtype=np.uint8, buffer=mapfile.read())

    while True:
        mapfile.seek(0)
        shared_image = np.ndarray(VIDEO_DIMENSIONS, dtype=np.uint8, buffer=mapfile.read())
        cv2.imshow('read_video', shared_image)

        if (cv2.waitKey(1) & 0xFF == ord('q')):
            break

    cv2.destroyAllWindows()

if __name__ == "__main__":
    proc = mp.Process(target=read_video)
    proc.start()
bml1g12 commented 3 years ago

Interesting, I have been testing only on Linux (for which I can confirm the script I listed works with Python 3.7 opencv-python==4.1.2.30); let me try and reproduce on my windows virtual machine

bml1g12 commented 3 years ago

Indeed I can reproduce the black screen issue on windows. The producer obtains the frame and puts it onto np_array, but this is not shared with the consumer, as it seems the consumer and producer have different references to np_array (can be verified by printing id(np_array).

I can also reproduce it on Linux when setting "mp.set_start_method('spawn', force=True)".

So the issue is due to the fact that Windows spawns processes rather than forks them, and that the np_array reference is not being passed to the process successfully. It would seem when "spawn" mode is enabled (i.e. on windows) one cannot pass a reference to np_array; and indeed probably the best would be via the newer SharedMemory class. The docs for that ttps://docs.python.org/3/library/multiprocessing.shared_memory.html don't mention the need for posix_ipc to share numpy arrays though?

I assume also the naive multiprocessed queue should work on windows (putting the array directly into the mp.queue and getting it from that) but as discussed in the article, this can be a little slow.

f1fan183 commented 3 years ago

Thanks for explaining the root cause. My target environment is Linux. I ran your modified code in Linux and it works. So, now I have two approaches for passing camera numpy data efficiently from the driver (producer) to the frame processor (consumer). My prior approach was multiprocessing.Queue based, which I found to be inefficient due to serialization of numpy data, especially across multiple camera feeds.

I ended up using posix_ipc, since I could not get the consumer to reliably access the shared memory using the name assigned by the consumer using the approach specified in Python documentation. This could be a timing issue with the two processes being initialized by the supervisor package. I will attempt to add a sleep (5) statement on the supervisor command for the consumer to delay its start time relative to the producer.