NVIDIA / VideoProcessingFramework

Set of Python bindings to C++ libraries which provides full HW acceleration for video decoding, encoding and GPU-accelerated color space and pixel format conversions
Apache License 2.0
1.32k stars 233 forks source link

Multi threading with Pytorch #416

Closed NguyenVanThanhHust closed 1 year ago

NguyenVanThanhHust commented 1 year ago

Hi. I want to use multi threading decoded and classify with pytorch.

I tried SampleDecode Multi Threading and SamplePytorchh Resnet seperately successfully.

I tried to push decoded imgs to queue and pytorch read from queue. I created threads to decode and 1 thread to clasiffy But the thread that process video doesn't start after all previous decoded thread finish

Can you make an example how to do that? That would be very help full.

Below is my code when I tried to use mentioned approach

#
# Copyright 2021 NVIDIA Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Starting from Python 3.8 DLL search policy has changed.
# We need to add path to CUDA DLLs explicitly.
import sys
import os

import torch
import torchvision

if os.name == 'nt':
    # Add CUDA_PATH env variable
    cuda_path = os.environ["CUDA_PATH"]
    if cuda_path:
        os.add_dll_directory(cuda_path)
    else:
        print("CUDA_PATH environment variable is not set.", file=sys.stderr)
        print("Can't set CUDA DLLs search path.", file=sys.stderr)
        exit(1)

    # Add PATH as well for minor CUDA releases
    sys_path = os.environ["PATH"]
    if sys_path:
        paths = sys_path.split(';')
        for path in paths:
            if os.path.isdir(path):
                os.add_dll_directory(path)
    else:
        print("PATH environment variable is not set.", file=sys.stderr)
        exit(1)

import pycuda.driver as cuda
import PyNvCodec as nvc
import PytorchNvCodec as pnvc
import numpy as np
from queue import Queue

main_queue = Queue()

from category import categories

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, gpuID, encFile, threadId):
        Thread.__init__(self)
        # Retain primary CUDA device context and create separate stream per thread.
        self.ctx = cuda.Device(gpuID).retain_primary_context()
        self.ctx.push()
        self.str = cuda.Stream()
        self.ctx.pop()
        self.gpuId = gpuID
        self.threadId = threadId

        # Resnet expects images to be 3 channel planar RGB of 224x244 size at least.
        self.target_w, self.target_h = 224, 224

        # Create Decoder with given CUDA context & stream.
        self.nvDec = nvc.PyNvDecoder(encFile, self.ctx.handle, self.str.handle)

        width, height = self.nvDec.Width(), self.nvDec.Height()
        hwidth, hheight = int(width / 2), int(height / 2)

        # Determine colorspace conversion parameters.
        # Some video streams don't specify these parameters so default values
        # are most widespread bt601 and mpeg.
        cspace, crange = self.nvDec.ColorSpace(), self.nvDec.ColorRange()
        if nvc.ColorSpace.UNSPEC == cspace:
            cspace = nvc.ColorSpace.BT_601
        if nvc.ColorRange.UDEF == crange:
            crange = nvc.ColorRange.MPEG
        self.cc_ctx = nvc.ColorspaceConversionContext(cspace, crange)
        print('Color space: ', str(cspace))
        print('Color range: ', str(crange))

        # Initialize colorspace conversion chain
        if self.nvDec.ColorSpace() != nvc.ColorSpace.BT_709:
            self.nvYuv = nvc.PySurfaceConverter(width, height, self.nvDec.Format(), nvc.PixelFormat.YUV420, self.ctx.handle, self.str.handle)
        else:
            self.nvYuv = None

        self.to_dim = nvc.PySurfaceResizer(self.target_w, self.target_h, nvc.PixelFormat.YUV420,
                                  self.gpuId)

        self.to_rgb = nvc.PySurfaceConverter(self.target_w, self.target_h,
                                        nvc.PixelFormat.YUV420, nvc.PixelFormat.RGB,
                                        self.gpuId)

        self.to_pln = nvc.PySurfaceConverter(self.target_w, self.target_h, nvc.PixelFormat.RGB,
                                        nvc.PixelFormat.RGB_PLANAR, self.gpuId)

        # Use most widespread bt601 and mpeg just for illustration purposes.
        self.cc_ctx = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_601,
                                             nvc.ColorRange.MPEG)
        self.num_frame = 0

    def run(self):
        try:
            while True:
                try:
                    # Decode 1 compressed video frame to CUDA memory.
                    self.rawSurface = self.nvDec.DecodeSingleSurface()
                    if (self.rawSurface.Empty()):
                        print('No more video frames')
                        break
                except nvc.HwResetException:
                    print('Continue after HW decoder was reset')
                    continue

                yuvSurface = self.nvYuv.Execute(self.rawSurface, self.cc_ctx)
                if (yuvSurface.Empty()):
                    print('Failed to do yuv conversion')
                    break

                # Downscale YUV420.
                yuv_small = self.to_dim.Execute(yuvSurface)
                if yuv_small.Empty():
                    print('Can not downscale yuv420 surface')
                    break

                # Convert from YUV420 to interleaved RGB.
                rgb24_small = self.to_rgb.Execute(yuv_small, self.cc_ctx)
                if rgb24_small.Empty():
                    print('Can not convert yuv420 -> rgb')
                    break

                # Convert to planar RGB.
                rgb24_planar = self.to_pln.Execute(rgb24_small, self.cc_ctx)
                if rgb24_planar.Empty():
                    print('Can not convert rgb -> rgb planar')
                    break

                # Export to PyTorch tensor
                surf_plane = rgb24_planar.PlanePtr()
                img_tensor = pnvc.makefromDevicePtrUint8(surf_plane.GpuMem(),
                                                        surf_plane.Width(),
                                                        surf_plane.Height(),
                                                        surf_plane.Pitch(),
                                                        surf_plane.ElemSize())

                img_tensor.resize_(3, self.target_h, self.target_w)
                img_np = img_tensor.detach().cpu().numpy()
                # print(type(img_tensor), img_tensor.device)
                # img_tensor = img_tensor.type(dtype=torch.cuda.FloatTensor)
                img_tensor = torch.divide(img_tensor, 255.0)

                # data_transforms = torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
                #                                                 std=[0.229, 0.224, 0.225])
                # surface_tensor = data_transforms(img_tensor)
                main_queue.put(img_np)
                print("main_queue.qsize(): ", main_queue.qsize())

                self.num_frame += 1
                if(0 == self.num_frame % 10):
                    print(self.num_frame)
                    print(self.threadId, main_queue.qsize())

        except Exception as e:
            print(getattr(e, 'message', str(e)))
            fout.close()

class MainThread(Thread):
    def __init__(self, ):
        Thread.__init__(self)
        self.model = torchvision.models.resnet50(pretrained=True)
        self.model.eval()
        self.model.to('cuda')
        print("run here")

    def run(self):
        print("run here too")
        try:
            print("run here too")
            while not main_queue.empty():
                # surface_tensor = main_queue.get()
                surface_np = main_queue.get()
                surface_tensor = torch.from_numpy(surface_np)
                surface_tensor = surface_tensor.cuda().float()
                input_batch = surface_tensor.unsqueeze(0).to('cuda')

                # Run inference.
                with torch.no_grad():
                    output = self.model(input_batch)

                probabilities = torch.nn.functional.softmax(output[0], dim=0)

                top5_prob, top5_catid = torch.topk(probabilities, 5)
                for i in range(top5_prob.size(0)):
                    print(categories[top5_catid[i]], top5_prob[i].item())
        except Exception as e:
            print("Exception: ", e)
            time.sleep(1)

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print('Provide gpu ID, paths to input video file.')
        exit

    gpu_id = int(sys.argv[1])
    input_video = sys.argv[2]

    cuda.init()
    num_threads = 2
    thread_pool = []
    for i in range(0, num_threads):
        thread = Worker(gpu_id, input_video, i+1)
        thread.start()
        thread_pool.append(thread)
    mainThread = MainThread()
    mainThread.start()

    # run_inference_on_video(gpu_id, input_video)

    for thread in thread_pool:
        thread.join()
    mainThread.join()
gedoensmax commented 1 year ago

I am guessing this has to do with the way python does "threading". As long as you are executing python code and do not dispatch in C libraries there is no parallel processing due to pythons global lock: https://docs.python.org/3/glossary.html#term-global-interpreter-lock All VPF operations return rather quick which will return into this lock again so you might want to look into multiprocessing instead of threading. Another option would be to do decode+classify in multiple threads instead of producer consumer scheme. Pytorch will dispatch into a C library for the network to execute "longer" leaving more time for another thread to process. I hope this makes sense and is not too misleading.

Also in general - have you identified having everything in one thread as a bottleneck ? You could check this by the GPU utilization or by profiling with Nsight systems.

NguyenVanThanhHust commented 1 year ago

Acctually, I tried multi process first. But it seems CUDA doesn't allow to use multi process to decode on GPU.

For decode + classify in multi threads, it seems it would create multi model on GPU, which is quite memory consumming.

I tried both multi processing from pure python and Pytorch.

have you identified having everything in one thread as a bottleneck? Actually, no, single thread is fine for me now. For my application, I can create many decoder, push result to fixed size list then use pytorhc model on that list. I just wonder if there is a way to use multi thread to optimizer further (I suppose?).

Tks.