bml1g12 / benchmarking_multiprocessing_np_array_comms

Benchmarking communication of numpy arrays between Python processes
MIT License
21 stars 5 forks source link

How to improve object detection performance by applying this method? #6

Closed HtutLynn closed 3 years ago

HtutLynn commented 3 years ago

Hi @bml1g12, my search for real-time object detection pipeline optimization and discovery to your great medium article led me to this repo. I really appreciate the article and your work.

Right now, I am trying to improve the real-time object detection pipeline by utilizing multiprocessing and multithreading. In my case, I have 4 processes that handle separate tasks. Mainly,

  1. Frame multiplex process, which creates batch preprocessed numpy arrays from multiple RTSP streams and utilizes multithreading within the process.
  2. Inference process, which utilizes deep learning object detection model.

In order to maximize the deep learning inference, I need to reduce the time for transferring data from the frame multiplex process to the Inference process as low as possible. Right now, I have tested two approaches,

  1. Utilizes mp.queue for communicating between two processes. However, this approach becomes really problematic because the inference process can't keep up with the multiplex process and memory overflow happens.
  2. Utilizes mp.pipe. Although this is better than using mp.queue since there is no buffer overflow problem, this approach is not really ideal either because transferring numpy arrays with pipe still makes deep learning model inference performance dips to around 40%.

Since this is real-time object detection pipeline, I also need to run with while loop. Can you please suggest to me how to handle or improves performance in my case?

Regards, Htut

bml1g12 commented 3 years ago

So you have several RTSP streams, and each one needs inference performed on the images? Does information need to be shared between each stream RTSP stream 1 2 and 3 --> big numpy array --> pre processing process --> inference

or are they independent like: RTSP stream 1 --> numpy array --> pre processing process --> inference RTSP stream 2 --> numpy array --> pre processing process --> inference RTSP stream 3 --> numpy array --> pre processing process --> inference

?

In the article, I suggest using a shared memory mp.Array like and either a queue or pipe to pass the reference to the memory (not the numpy arrray itself) - this avoids the overhead of serialising the data to share it between processes, which is very slow. It sounds likely to be suitable for your scenario too, but you mentioned memory overflow - this should not be possible if you use the approach in the article as the amount of memory needed is reserved at the start and fixed.

bml1g12 commented 3 years ago

In terms of reading the RTSP streams, if it is a bottleneck you might find some speed up in using third party libraries https://towardsdatascience.com/lightning-fast-video-reading-in-python-c1438771c4e6 which essentially multithreaded the reading part. But likely inference if your bottleneck (in most AI applications) in which case the reading speed won't make much difference.

HtutLynn commented 3 years ago

@bml1g12 , Thanks for the reply. As for RTSP stream inference, it's going to be batch inference approach since it is much faster that way. RTSP reading process is not a big concern, because my performance target for this pipeline is around 24 fps and as long as the RTSP reading process can output up to 24fps, that's totally fine.

I am currently using pipe approach and it's still so much better than the serial approach, which does preprocess, inference, postprocess one by one. However, after benchmarking, it seems to be that the pipe still struggles with transferring large numpy data between processes.

Here is my implementation,

def batch_multiplex_process(streams, preprocess_fn, model_type, input_size, skip_frames, batch_parent_conn, run):
    # Create Batch Loader Object for parsing frames from multiple inputs asynchronously
    # Uses threads for getting frames
    BatchLoader = LoadStreams(sources=streams,
                              preprocess_fn=preprocess_fn,
                              model_type=model_type,
                              input_shape=[input_size, input_size],
                              skip_frames=skip_frames)

    while True:
        # get the data from LoadStreams object
        batch_data, ori_imgs, metas = next(BatchLoader)

        # NUMPY ARRAY : ([number of streams, 3, 416, 416]), data type is np.float32
        # if it is 1 stream, NUMPY ARRAY is around 2.3 MB
        batch_parent_conn.send(batch_data)

def inference_process(detection_model, batch_child_conn, run):
    # keeps getting frames from queue
    while True:
        # receiving this large data takes quite a long
        preprocessed_batch_data = batch_child_conn.recv()

        inferred_outputs = detection_model.infer(preprocessed_data=preprocessed_batch_data,
                                                 batch_size=len(preprocessed_batch_data))

Can you please provide some examples for this implementation?

bml1g12 commented 3 years ago

The trick is you need make batch_data not a bunch of numpy array data, but instead a reference to some memory containing the numpy array using shared memory. There's a reference example here:

https://github.com/bml1g12/benchmarking_multiprocessing_np_array_comms/blob/main/array_benchmark/demo_application_benchmarking/shared_memory_array.py

If I understand correctly, your BatchLoader here is loading data from many streams into one mega-array? Is that a requirement, as if you can process each stream independently it would presumably lower communication needs and speed up your parallelism. But if your bottleneck is not the next(BatchLoader) then I suppose it's fine making a mega-array of all streams :+1:

i.e. it looks like what you have is something like:

Process 1: "RTSP stream 1 2 and 3 --> big numpy array --> pre processing process --> Process 2" Process 2 : --> "inference"

So if you know the shape of the array in advance, then you can use the approach in shared_memory_array.py to get a speedup. I.e.

i.e. somewhere at the start of your software, you reserve this memory: where array_dim = (number of streams, 3, 416, 416)

        queue = mp.Queue(maxsize=100)
        mp_array = mp.Array("I", int(np.prod(frame_gen_config["array_dim"])), lock=mp.Lock())
        np_array = np.frombuffer(mp_array.get_obj(), dtype="I").reshape(
            frame_gen_config["array_dim"])
        per_camera_arrays[camera_index] = (queue, mp_array, np_array)

Here the queue is just to store frame metadata.

Then in your producer (batch_multiplex_processes) instead of batch_parent_conn.send(batch_data) you can have

        mp_array.acquire()
        np_array[:] = frame
        queue.put(frames_written)  # store metadata related to the frame, such as timestamp

https://github.com/bml1g12/benchmarking_multiprocessing_np_array_comms/blob/4d6855927d7d1956475ee6bcd6c2191dba192e4a/array_benchmark/demo_application_benchmarking/shared_memory_array.py#L39

And instead of your current consumer inference_process (preprocessed_batch_data = batch_child_conn.recv()) you can have

    _ = queue.get()  # get the frame metadata
    img = np_array.astype("uint8").copy()
    # Do inference and ONLY WHEN IT IS DONE release the memory here, as otherwise it might be edited by producer whilst you are doing inference on it!
    mp_array.release()

This implementation is using shared memory to store a single array, and then when it's processed by the inference process release it.

One limitation is that as written here, you'd only be able to have one inference process running on one frame at a time (as the .acquire() // .release() locks the memory until we are done processing it). If you want to have many inference processes running on many frames at a time, you can extend this implementation to have rather than one reserved patch of memory, to have as many as you need, and have it so when you start a process, it knows which patch of memory to look at.

The whole thing is a lot more fiddly than a simple queue or pipe, but the problem with mulitprocessing in python with numpy arrays and either pipes or queues, is that they both seriealise the data into a pickle file, and thats incredibly slow, so there's no easy away around this - I think shared memory is the only way to communicate numpy arrays without serialisation with multiprocessing

HtutLynn commented 3 years ago

@bml1g12 , Thanks for the detailed reply. As you mentioned, fetching data from dataloader is not a bottleneck in my case, instead, it is the pipe, that takes quite a long time to send data from batch_multiplex_process to inference_processs. I think the approach, you mentioned here would work well. I've been learning multiprocessing recently, and this really helps me.