NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.1k stars 615 forks source link

COCOPipeline performance can't linear scaling align with GPU counts #2064

Closed XiaotaoChen closed 4 years ago

XiaotaoChen commented 4 years ago

I'm trying COCOPipeline to speedup training performance for detection tasks. And benchmark the performance with different GPU counts. There are two problems confused me.

envs

hardwares:

gpu: 8 x 2080Ti
cpu: Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz 96 cores.

softwares

nvidia-dali  0.21.0
cuda   10.0

problems

1. the performance can't linear scaling align with GPU counts with multip-threads(each gpu launch a thread). especially, when GPU counts>=4. I test the performance with DALI_extra data and coco2014_val dataset respectively. And the performance as below.

gpus DALI_extra speed (samples/s) DALI_extra repeat 2 coco2014_val speed (samples/s) coco2014_val 2 coco2014_val 3 coco2014_val 4
1 154.07 154.13 519.03 473.48 352.80 361.97
2 249.7 220.62 462.42 372.29 381.10 442.34
4 273.74 273.99 416.83 400.72 451.99 406.45
8 288.03 281.19 417.47 392.35 467.32 431.43

notices: repeat 2 , repeat 3, repeat 4 means repeat test it.

as above table showed. when dataset is DALI_extra, the performance is improved obviously with GPU counts from 1 to 2. But when GPU counts=4 or 8, it's performance is same as 2 gpus nearly. Can COCOPipeline's performance be Linear scaling ? how can i do to improve multipe-GPU's performance ?

2. COCOPipeline's performance is unstable in COCO2014_val dataset. there are two problems with coco2014_val dataset. 1. as above mentioned, the performance of multipe-gpus is almost same with single gpu. even worse than single gpu. 2. its' performance is unstable, with the wide range of its' fluctuations. Can you give some advices for the possible reasons ?

COCOPipeline definition and benchmark script

COCOPipeline definition as below

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class COCOPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, 
                 file_root, annotations_file, short, long, 
                 max_num_gt=100,
                 num_gpus=1, direction="horizontal"):
        super(COCOPipeline, self).__init__(
            batch_size, num_threads, device_id, seed=15)
        assert direction in ["horizontal", "vertical"]
        self.input = ops.COCOReader(
            file_root=file_root,
            annotations_file=annotations_file,
            shard_id=device_id,
            num_shards=num_gpus,
            ratio=True,
            ltrb=True)
        self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB)
        self.prospective_crop = ops.RandomBBoxCrop(
            device="cpu",
            aspect_ratio=[0.5, 2.0],
            thresholds=[0.1, 0.3, 0.5],
            scaling=[0.8, 1.0],
            ltrb=True)
        # resize
        self.resize = ops.Resize(device="gpu", interp_type=types.INTERP_LINEAR,
                                 resize_shorter=short, max_size=long)
        self.shape = ops.Shapes(device="gpu")
        # normalize and convert hwc to chw
        self.cmnp = ops.CropMirrorNormalize(
            device="gpu",
            output_dtype=types.FLOAT,
            output_layout=types.NCHW,
            image_type=types.RGB,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255])
        self.flip = ops.Flip(device="gpu", horizontal=1)
        self.bbflip = ops.BbFlip(device="cpu", ltrb=True, horizontal=1)
        # padding axes=(0,1) -> hwc, axes=(1,2) -> chw
        if direction == "vertical":
            self.padding = ops.Pad(device="gpu", fill_value=0, axes=(1,2), shape=(long, short))    
        else:
            self.padding = ops.Pad(device="gpu", fill_value=0, axes=(1,2), shape=(short, long))
        self.bbox_padding = ops.Pad(device="cpu", fill_value=-1, axes=(0,1), shape=(max_num_gt, 4))
        self.label_padding = ops.Pad(device="cpu", fill_value=-1, axes=(0,1), shape=(max_num_gt, 1))
        self.gpu_cast = ops.Cast(device="gpu", dtype=types.FLOAT)
        self.cpu_cast = ops.Cast(device="cpu", dtype=types.FLOAT)

    def define_graph(self):
        inputs, bboxes, labels = self.input()
        images = self.decode(inputs)
        images = self.flip(images)
        bboxes = self.bbflip(bboxes)
        src_shape = self.shape(images)
        images = self.resize(images)
        resized_shape = self.shape(images)
        images = self.cmnp(images)
        images = self.padding(images)
        bboxes = self.bbox_padding(bboxes)
        labels = self.label_padding(labels)
        labels = self.cpu_cast(labels)
        src_shape = self.gpu_cast(src_shape)
        resized_shape = self.gpu_cast(resized_shape)
        return (images, bboxes, labels, src_shape, resized_shape)

benchmark scripts

from queue import Queue
from threading import Thread
import time

def tmp_worker(pipes, data_queue, worker_id):
    while True:
        pipe_out = pipes[worker_id].run()
        data_queue[worker_id].put(pipe_out)
def multi_thread_pipe():
    # file_root = "data/coco/images/val2014"
    # annotations_file = "data/instances_val2014_horz.json"
    test_data_root = "/mnt/truenas/scratch/xiaotao.chen/Repositories/DALI_extra"
    file_root = os.path.join(test_data_root, 'db', 'coco', 'images')
    annotations_file = os.path.join(test_data_root, 'db', 'coco', 'instances.json')

    num_gpus = 1
    single_batch_size = 2
    direction = "horizontal"
    pipes = [COCOPipeline(batch_size=single_batch_size, num_threads=2, device_id=device_id,
                          file_root=file_root, annotations_file=annotations_file, 
                          short=800, long=1200, num_gpus=num_gpus, direction=direction)  for device_id in range(num_gpus)]
    for pipe in pipes:
        pipe.build()
        print("pipeline epoch size:{}".format(pipe.epoch_size()))

    pipe_out = [pipe.run() for pipe in pipes]

    num_threads = num_gpus
    data_queues = [Queue(maxsize=8) for worker_id in range(num_threads)]
    threads = [Thread(target=tmp_worker, args=(pipes, data_queues, worker_id)) for worker_id in range(num_threads)]
    for th in threads:
        th.daemon = True
        th.start()

    start_time = time.time()
    pipe_out = None
    for i in range(100):
        tic = time.time()
        pipe_out = [q.get() for q in data_queues]
        interval = time.time() - tic
        if i>0 and i%5==0:
            print("{} time: {} speed {} samples/s".format(
                   i, interval, single_batch_size*len(pipe_out) / interval))
    total_time = time.time() - start_time
    print("time:{} avg speed {} samples/s".format(total_time, 100 * single_batch_size * len(pipe_out) / total_time))

if __name__ == "__main__":
    multi_thread_pipe()
JanuszL commented 4 years ago

Hi, I will try to repro that and get back to you soon with more details.

XiaotaoChen commented 4 years ago

Hi, I will try to repro that and get back to you soon with more details.

Thanks

XiaotaoChen commented 4 years ago

Hi, I will try to repro that and get back to you soon with more details.

Hi, Did you repro my problems ? @JanuszL

JanuszL commented 4 years ago

Hi, A few comments:

XiaotaoChen commented 4 years ago

Thanks a lot for the detailed explanation. @JanuszL In my DALILoader, pipe_out has copied out to MXNDArray. The schedule_run example is useful for me. Does schedule_run() release GIL ? And I'll try to use Distributed Data Parallel to avoid Python GIL and CPU bottleneck.

JanuszL commented 4 years ago

Hi,

Does schedule_run() release GIL It doesn't but it only launches work that is done in the native DALI thread and returns to Python without doing any substantial work. So even it doesn't release GIL it doesn't wait for the processing result and you can consider this operation as a rather fast one.

XiaotaoChen commented 4 years ago

Hi,

Does schedule_run() release GIL It doesn't but it only launches work that is done in the native DALI thread and returns to Python without doing any substantial work. So even it doesn't release GIL it doesn't wait for the processing result and you can consider this operation as a rather fast one.

Thanks a lot @JanuszL

XiaotaoChen commented 4 years ago

@JanuszL Hi, I tried COCOPipeline with multip-process, its' performance is great and linear scaling align with gpu counts. And also tried DALIGenericIterator in nvidia.dali.plugin.mxnet. it's blocked by python GIL too. the performance with different gpu counts as belows:

gpu count speed (samples/s) speed repeat (samples/s)
1 259.91 354.45
2 261.29 344.21
4 209.75 317.83
8 228.6 200.32

There are two questions about DALIGenericiterator:

  1. Does the Python GIL cause the more gpu counts and worse performance ? I'm not sure.
  2. I also tried HybridTrainPipe with DALIGenericIterator , whose input is mxnet rec file for classification task. Its' performance is amazing. Why HybridTrainPipe have no Python GIL problems ?

the test script as belows

def test_dali_mxloader():
    file_root = "data/coco/images/val2014"
    annotations_file = "data/instances_val2014_horz.json"
    num_gpus = 8
    single_batch_size = 2
    sizes = {"horizontal": 29221, "vertical":11283}
    direction = "horizontal"
    data_size = sizes[direction]
    pipes = [COCOPipeline(batch_size=single_batch_size, num_threads=2, device_id=device_id,
                            file_root=file_root, annotations_file=annotations_file, 
                            short=800, long=1200, num_gpus=num_gpus, direction=direction)  for device_id in range(num_gpus)]

    output_map = {("image", "data"),
                  ("bbox", "data"),
                  ("label", "data"),
                  ("src_shape", "data"),
                  ("resized_shape", "data")
                 }
    loader = DALIGenericIterator(pipes, output_map, size = str(data_size))
    data = next(loader)
    print(data)
    mx.nd.waitall()
    size = 0
    start_time = time.time()
    for i in range(100):
        tic = time.time()
        data = next(loader)
        mx.nd.waitall()
        interval = time.time() - tic
        size = len(data) * data[0].data[0].shape[0]
        if i > 0 and i % 5 == 0:
            print("{} size:{} speed: {} samples/s".format(i, size, size /interval))
    mx.nd.waitall()
    total_time = time.time() - start_time
    print("avg speed {} samples/s".format(100*size/total_time))

if __name__ == "__main__":
    test_dali_mxloader()

hybridTrainPipeline defination:

class HybridTrainPipe(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus, db_folder):
        super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id, seed=12 + device_id)
        self.input = ops.MXNetReader(path=[os.path.join(db_folder, "train.rec")], index_path=[os.path.join(db_folder, "train.idx")],
                                     random_shuffle=True, shard_id=device_id, num_shards=num_gpus)
        self.decode = ops.nvJPEGDecoder(device="mixed", output_type=types.RGB)
        self.rrc = ops.RandomResizedCrop(device="gpu", size=(224, 224))
        self.cmnp = ops.CropMirrorNormalize(device="gpu",
                                            output_dtype=types.FLOAT,
                                            output_layout=types.NCHW,
                                            crop=(224, 224),
                                            image_type=types.RGB,
                                            mean=[0.485 * 255,0.456 * 255,0.406 * 255],
                                            std=[0.229 * 255,0.224 * 255,0.225 * 255])
        self.coin = ops.CoinFlip(probability=0.5)

    def define_graph(self):
        rng = self.coin()
        self.jpegs, self.labels = self.input(name = "Reader")
        images = self.decode(self.jpegs)
        images = self.rrc(images)
        output = self.cmnp(images, mirror = rng)
        return [output, self.labels]
JanuszL commented 4 years ago

Does the Python GIL cause the more gpu counts and worse performance ? I'm not sure.

We didn't investigate that very deeply as the distributed data-parallel is the way we recommend. But this is most likely what may be happening there - Python is just blocked in one pipeline while the rest is waiting for the work to be scheduled.

I also tried HybridTrainPipe with DALIGenericIterator , whose input is mxnet rec file for classification task. Its' performance is amazing. Why HybridTrainPipe have no Python GIL problems ?

Do you use raw ImageNet or resized one? Again, this is mostly my guess, but processing of every batch could be short enough that whenever python asks pipeline for outputs it is already there and there is no waiting. If there is any waiting on any pipeline, it may happen that work on other pipelines is already done and output just waits for this slowest pipeline.

XiaotaoChen commented 4 years ago

Does the Python GIL cause the more gpu counts and worse performance ? I'm not sure.

We didn't investigate that very deeply as the distributed data-parallel is the way we recommend. But this is most likely what may be happening there - Python is just blocked in one pipeline while the rest is waiting for the work to be scheduled.

I also tried HybridTrainPipe with DALIGenericIterator , whose input is mxnet rec file for classification task. Its' performance is amazing. Why HybridTrainPipe have no Python GIL problems ?

Do you use raw ImageNet or resized one? Again, this is mostly my guess, but processing of every batch could be short enough that whenever python asks pipeline for outputs it is already there and there is no waiting. If there is any waiting on any pipeline, it may happen that work on other pipelines is already done and output just waits for this slowest pipeline.

About imagenet rec file, you are right, the rec file is resized. Thanks for your helpful reply.