NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.15k stars 621 forks source link

benchmark versus iterable-style datasets #1990

Open etienne87 opened 4 years ago

etienne87 commented 4 years ago

Sorry if it is not an issue per-say. Would it be interesting to benchmark the nvidia-dali versus the iterable-style Pytorch Dataloader which also allows stateful streaming in multiple processes? (or even a custom multiprocessing in python + numpy)? do we expect much more perf on the nvidia-dali side?

opencv_streamer.py

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division

import os, glob, random, time
import numpy as np
import cv2
import torch

class OpenCVStream(object):
    def __init__(
        self,
        video_filename,
        height,
        width,
        seek_frame=0,
        max_frames=-1,
        random_start=True,
        rgb=False,
    ):
        self.height = height
        self.width = width
        self.random_start = random_start
        self.max_frames = max_frames
        self.rgb = rgb
        self.reload(video_filename, seek_frame)

    def original_size(self):
        height, width = (
            self.cap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT),
            self.cap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH),
        )
        return int(height), int(width)

    def reload(self, video_filename, seek_frame=-1):
        self.filename = video_filename
        self.cap = cv2.VideoCapture(video_filename)
        self.iter = 0
        if self.random_start and seek_frame == -1:
            num_frames = int(self.cap.get(cv2.cv2.CAP_PROP_FRAME_COUNT))
            seek_frame = random.randint(0, num_frames // 2)
            if seek_frame > 0:
                self.cap.set(cv2.cv2.CAP_PROP_POS_FRAMES, seek_frame)
        else:
            seek_frame = 0 if seek_frame == -1 else seek_frame
            if seek_frame > 0:
                self.cap.set(cv2.cv2.CAP_PROP_POS_FRAMES, seek_frame)
        self.start = seek_frame
        if self.height == -1 or self.width == -1:
            self.height, self.width = self.original_size()

    def pos_frame(self):
        return self.start + self.iter

    def __len__(self):
        if self.max_frames > -1:
            return self.max_frames
        else:
            num_frames = int(self.cap.get(cv2.cv2.CAP_PROP_FRAME_COUNT))
            return num_frames - self.start

    def __next__(self):
        if not self.cap:
            return False, None

        if not self.cap or (self.max_frames > -1 and self.iter >= self.max_frames):
            return False, None

        ret, frame = self.cap.read()
        if ret:
            if not self.rgb:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frame = cv2.resize(frame, (self.width, self.height), 0, 0, cv2.INTER_AREA)
            self.iter += 1
        return frame

    def __iter__(self):
        return self

stream_loader.py

import glob
import torch
import numpy as np
from torchvision.utils import make_grid
from torch.utils.data import IterableDataset, DataLoader
from itertools import chain, cycle, islice
import random

from opencv_streamer import OpenCVStream
import cv2
import time

class MyIterableDataset(IterableDataset):
    def __init__(self, data_list, batch_size=4, tbins=5):
        self.data_list = data_list
        self.batch_size = batch_size
        self.tbins = tbins

    @property
    def shuffle_data_list(self):
        return random.sample(self.data_list, len(self.data_list))

    def process_data(self, data):
        print("data: ", data)
        stream = OpenCVStream(data, 240, 320)
        worker = torch.utils.data.get_worker_info()
        worker_id = worker.id if worker is not None else -1

        out = []
        for frame_num, x in enumerate(stream):

            out.append(x[None])
            if len(out) == self.tbins:
                o = np.concatenate(out)
                out = []
                yield o

    def get_stream(self, data_list):
        tmp = map(self.process_data, iter(data_list))
        out = chain.from_iterable(tmp)
        return out

    def __iter__(self):
        return zip(
            *[self.get_stream(self.shuffle_data_list) for _ in range(self.batch_size)]
        )

    @classmethod
    def split_datasets(cls, data_list, batch_size, tbins, max_workers):
        for n in range(max_workers, 0, -1):
            if batch_size % n == 0:
                num_workers = n
                break
        # Here is an example which utilises a single worker to build the entire batch.
        split_size = batch_size // num_workers
        return [
            cls(data_list, batch_size=split_size, tbins=tbins)
            for _ in range(num_workers)
        ]

class MultiStreamDataLoader:
    def __init__(self, datasets):
        self.datasets = datasets

    def get_stream_loaders(self):
        dataloaders = [
            DataLoader(dataset, num_workers=1, batch_size=None, pin_memory=True)
            for dataset in self.datasets
        ]
        return zip(*dataloaders)

    def __iter__(self):
        for batch_parts in self.get_stream_loaders():
            yield list(chain(*batch_parts))

video_dir = "/home/etienneperot/workspace/data/slow-motion/video01/"

data_list = glob.glob(video_dir + "*.MP4")

viz = False
datasets = MyIterableDataset.split_datasets(
    data_list, batch_size=4, tbins=10, max_workers=4
)
loader = MultiStreamDataLoader(datasets)
start = 0

for data in loader:

    batch = torch.cat([item[:, None] for item in data], dim=1)
    print(time.time() - start, " s for loading")

    if viz:
        for t in range(0, datasets[0].tbins, 4):
            b = batch[t][:, None]
            im = make_grid(b).cpu().permute(1, 2, 0).numpy().copy()
            cv2.imshow("img", im)
            cv2.waitKey(5)
    else:
        time.sleep(0.4)

    start = time.time()
JanuszL commented 4 years ago

Hi, DALI doesn't utilize multiple processes but multiple threads on the native level. The approach that PyTorch has taken is the result of a lack of the ability to perform true multithread processing in Python. In the case of DALI, it is not an issue as the whole processing happens on the native side without the need to obtain Python GIL. Answering your question when you use build-in DALI operators there should not be that much performance difference other than coming from the fact that some processing is done at python level in the case of PyTorch data loader, while DALI does everything on the native side and there is no need to perform interprocess communication via shared memory. The difference will show up when you ask DALI to use Python operator - in this case, the Python GIL limitation comes to play and DALI cannot utilize parallel processing.

etienne87 commented 4 years ago

thanks for answering so fast! Just to be sure, you would say then that Dali is worth it considering preprocessing has not the latency of python calls (to opencv functions for instance) and memory sharing in c++ is faster thanks to usage of threads and not multiprocessing right? i am just looking at a rough estimation of runtime relative difference (say idk 1.5x?) in case of sharing those video-clips in a temporally coherent manner accross batches (probably it grows with video-clip resolution and length).

JanuszL commented 4 years ago

What I wanted to point out were the things that should be considered, but I cannot tell how much they can contribute to the overall performance difference (also I don't know that well the PyTorch dataloader implementation). The overall overhead will contribute less to the total time as data gets bigger - the amount processing done by each invocation will grow (amount of OpenCV work) while the invocation time is more or less constant. You also need to consider that each operation implemented in OpenCV and DALI may have differences in the performance as well. We plan to implement the Resize operator for the Video soon so you can run side by side perf comparison on your own.