NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.06k stars 615 forks source link

How can I use one pipeline outputs as another pipeline inputs in ML workflow? #5070

Open husterdjx opened 11 months ago

husterdjx commented 11 months ago

Describe the question.

Hello! I'm trying to build a ML workflow by using NVIDIA DALI to preprocess and transfer images.

First, I build a pipeline to preprocess file images as a batch input to Pytorch SSD, the definition looks like:

from nvidia.dali import pipeline_def, fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import feed_ndarray

@pipeline_def
def simple_pipeline(resize_flag=True, resize_x=300, resize_y=300):
    jpegs, labels = fn.readers.file(file_root='./images',
                                    random_shuffle=True,
                                    name="Reader")
    images = fn.decoders.image(jpegs, device="mixed", output_type=types.RGB)

    # do preprocess here

    # Resize images
    if resize_flag:
        images = fn.resize(images, resize_x=resize_x, resize_y=resize_y, device="gpu")

    # Normalize images
    mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]  # DALI uses 0-255 scale for images
    std = [0.229 * 255, 0.224 * 255, 0.225 * 255]

    images = fn.crop_mirror_normalize(images,
                                    mean=mean,
                                    std=std,
                                    output_dtype=types.FLOAT,
                                    device="gpu")

    return images, labels

pipe = simple_pipeline(batch_size=4, num_threads=3, device_id=0)
pipe.build()

_images, _ = pipe.run()

Check the type and shape:

print("-----_images type: ", type(_images)) # nvidia.dali.backend_impl.TensorListGPU
print("-----_images.dtype: ", _images.dtype) # DALIDataType.FLOAT
print("-----_images.shape: ", _images.shape()) # [(3, 300, 300), (3, 300, 300), (3, 300, 300), (3, 300, 300)]

_images_dali_tensor = _images.as_tensor()
print("-----_images_dali_tensor type: ", type(_images_dali_tensor)) # <class 'nvidia.dali.backend_impl.TensorGPU'>
print("-----_images_dali_tensor.dtype: ", _images_dali_tensor.dtype) # DALIDataType.FLOAT
print("-----_images_dali_tensor.shape: ", _images_dali_tensor.shape()) # [4, 3, 300, 300]

If I use feed_ndarray to copy _images_dali_tensor to a torch.tensor created by torch.empty, and push it into pytorch ssd model, the output looks OK to me.

However, I want to use the output of simple_pipeline so that I can crop and resize images without extra copy and GPU Mem(I want to crop and resize images as inputs to Pytorch Resnet50). When I try fn.external_source, I got lots of error:

@pipeline_def
def dali_resnet_pipeline(ssd_bboxes, resize_flag=True, resize_x=224, resize_y=224):
    # Use ExternalSource for images and ssd_bboxes
    images = fn.external_source(source=_images, device="gpu", dtype=types.FLOAT)
    print("images type: ", type(images)) # <class 'nvidia.dali.pipeline.DataNode'>

    lefts, bots, rights, tops = ssd_bboxes[:, 0], ssd_bboxes[:, 1], ssd_bboxes[:, 2], ssd_bboxes[:, 3]

    cropped_images = fn.crop(images, crop_pos_x=lefts, crop_pos_y=bots, crop_w=rights-lefts, crop_h=tops-bots, device="gpu")

    if resize_flag:
        cropped_and_resized_images = fn.resize(cropped_images, resize_x=resize_x, resize_y=resize_y)

    return cropped_and_resized_images

I'm not at all sure that I'm doing the right thing with fn.external_source and fn.crop, or how to use DALI to build a ML multi stage workflow, so if anyone can help me out, I'd really appreciate it!

Check for duplicates

JanuszL commented 11 months ago

Hi @husterdjx, Thank you for reaching out. As I understand you want to split your flow into multiple stages for simplicity. If you care about the performance I would recommend keeping as many transformations inside one processing graph. Also print("images type: ", type(images)) inside the processing flow definition will just tell you that the images are the node, if you want to see actual data please check the debug mode. In general, it is possible to pass the output of one DALI pipeline to another like:

import nvidia.dali.fn as fn
from nvidia.dali import pipeline_def

@pipeline_def(device_id=0)
def input_pipe():
    images, labels = fn.readers.file(files=['DALI_extra/db/single/jpeg/100/swan-3584559_640.jpg'], labels=[0])
    images = fn.decoders.image(images, device='mixed')
    return images, labels

ip = input_pipe(batch_size=2, num_threads=1, device_id=0)
ip.build()

@pipeline_def(device_id=0)
def output_pipe(data_source=None):
    images, labels = fn.external_source(source=data_source, num_outputs=2, device='gpu')
    images = fn.resize(images, size=[100,100])
    return images, labels

def input_iter():
    return ip.run()

op = output_pipe(batch_size=2, num_threads=1, device_id=0, data_source=input_iter)
op.build()
o = op.run()
print(o)

Another option is described in https://github.com/NVIDIA/DALI/issues/3702.

husterdjx commented 11 months ago

Thanks a million, it works!! @JanuszL After adding and your suggested code, my code can run fine. I still have another question. How can I take each image in a batch (represented as a TensorListGPU) and crop multiple smaller images according to the coordinates of the boxes I provided? For example, I have an input images(contains 4 images) and a group of boxes for each image:

@pipeline_def(batch_size=max_batch_size, num_threads=1, device_id=0, debug=True)
def dali_resnet_pipeline(ssd_boxes):

    input = fn.external_source(name="pipe2_input", device="gpu")
    print("[input]:\n", input)

    print("ssd_boxes:", ssd_boxes)

    cropped_images = fn.crop(input, ...)

    ...

    return ...

print input:

[input]:
 DataNodeDebug(
    name="pipe2_input",
    data=TensorListGPU(
              ....
              )
             dtype=DALIDataType.UINT8,
             layout="HWC",
             num_samples=4,
             shape=[(300, 300, 3), (300, 300, 3), (300, 300, 3), (300, 300, 3)])

print ssd_boxes:

ssd_boxes: [[[32.2206974029541, 6.715956330299377, 252.29734182357788, 276.8933415412903]], [[32.2206974029541, 6.715956330299377, 252.29734182357788, 276.8933415412903]], [[32.2206974029541, 6.715956330299377, 252.29734182357788, 276.8933415412903]], [[32.2206974029541, 6.715956330299377, 252.29734182357788, 276.8933415412903]]]

Here, each image only has one box that need to be cropped. However, If each image in a batch has more than one boxes(one image generate more than one cropped images), how should I crop these small boxes in an efficient way?

Thanks again for your previous help!

JanuszL commented 11 months ago

Hi @husterdjx,

Currently, DALI doesn't have a dedicated multi-crop operator. What you can do (assuming that you feed a separate pipeline with data for it) is to flatten all the boxes and duplicate images accordingly:

def input_iter():
    imgs, boxes = ip.run()
    return ([imgs[0], imgs[1], imgs[1]], [boxes[0], boxes[1], boxes[1]])
husterdjx commented 11 months ago

Very helpful! @JanuszL

I have done majority of my work with your effective advice!

What should I do if I want to analyze the memory usage of DALI? For example, how to get the GPU Mem Usage of a nvidia.dali.backend_impl.TensorListGPU / nvidia.dali.backend_impl.TensorGPU ? or other DataNode object.

I'm trying to determine when I do a multi-crop operation in the way above, will it result in an increase in memory usage on the GPU?

Looking forward to your kind reply!!

JanuszL commented 11 months ago

Hi @husterdjx,

It should not. When you index the tensor list you just get pointers to a particular tensor. To limit memory usage you can also enable no_copy in the external source operator. However, DALI doesn't expose the internal statistics regarding memory consumption. You can use nvidia-smi (or the NVML python bindings) to get the total GPU memory consumption, still, DALI uses a memory pool so the consumption would bewitch the pool's size margin but not exactly given.

husterdjx commented 11 months ago

Thanks a lot to you!! @JanuszL

husterdjx commented 11 months ago

@JanuszL Hi, sorry to bother you, I got another error when I use the method in #3702. If I remove the debug flag in the second pipeline, the fn.external_source throw an exception:

Traceback (most recent call last):
  File "/workspace/expriment/ml-workflow/traffic/dali_ssd_resnet.py", line 134, in <module>
    _, resnet_input_images = pipe2.run() 
  File "/opt/conda/lib/python3.10/site-packages/nvidia/dali/pipeline.py", line 1113, in run
    return self.outputs()
  File "/opt/conda/lib/python3.10/site-packages/nvidia/dali/pipeline.py", line 956, in outputs
    return self._outputs()
  File "/opt/conda/lib/python3.10/site-packages/nvidia/dali/pipeline.py", line 1040, in _outputs
    return self._pipe.Outputs()
RuntimeError: Critical error in pipeline:
Exception in CPU stage: [/opt/dali/dali/pipeline/executor/executor.cc:629] Failed to acquire the next batch. Make sure, that DALI Pipeline is fed with sufficient amount of data. Attempted to peek the data batch that doesn't exist. Add more elements to the DALI input operator.
Stacktrace (10 entries):
[frame 0]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(+0xe258b) [0x7f6a02fac58b]
[frame 1]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(dali::Executor<dali::AOT_WS_Policy<dali::UniformQueuePolicy>, dali::UniformQueuePolicy>::InferBatchSize(std::vector<dali::BatchSizeProvider*, std::allocator<dali::BatchSizeProvider*> > const&) const+0x25f) [0x7f6a03018bbf]
[frame 2]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(dali::Executor<dali::AOT_WS_Policy<dali::UniformQueuePolicy>, dali::UniformQueuePolicy>::PreRun()+0x14) [0x7f6a030192b4]
[frame 3]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(dali::Executor<dali::AOT_WS_Policy<dali::UniformQueuePolicy>, dali::UniformQueuePolicy>::RunCPUImpl(unsigned long)+0x1e) [0x7f6a0302655e]
[frame 4]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(dali::Executor<dali::AOT_WS_Policy<dali::UniformQueuePolicy>, dali::UniformQueuePolicy>::RunCPU()+0x21) [0x7f6a03027711]
[frame 5]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(+0x11da65) [0x7f6a02fe7a65]
[frame 6]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(+0x19873c) [0x7f6a0306273c]
[frame 7]: /opt/conda/lib/python3.10/site-packages/nvidia/dali/libdali.so(+0x7a9fd0) [0x7f6a03673fd0]
[frame 8]: /lib/x86_64-linux-gnu/libpthread.so.0(+0x76db) [0x7f6aac5ee6db]
[frame 9]: /lib/x86_64-linux-gnu/libc.so.6(clone+0x3f) [0x7f6aabb7261f]

Current pipeline object is no longer valid.

My code:

# DALI
@pipeline_def
def simple_pipeline(resize_flag=True, resize_x=300, resize_y=300):
    pngs, labels = fn.readers.file(file_root='./images',
                                    random_shuffle=True,
                                    file_filters="*.png")
    images = fn.decoders.image(pngs, device="mixed", output_type=types.RGB)

    # do preprocess here

    # Resize images
    if resize_flag:
        resized_images = fn.resize(images, resize_x=resize_x, resize_y=resize_y, device="gpu")

    # # Normalize images
    mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]  # DALI uses 0-255 scale for images
    std = [0.229 * 255, 0.224 * 255, 0.225 * 255]

    normalized_images = fn.crop_mirror_normalize(resized_images,
                                    mean=mean,
                                    std=std,
                                    output_dtype=types.FLOAT,
                                    device="gpu")

    return normalized_images, resized_images

@pipeline_def(batch_size=max_batch_size, num_threads=1, device_id=0)
def dali_resnet_pipeline():
    input = fn.external_source(name="pipe2_input", device="gpu")
    # print(input)

    cropped_image = fn.crop(input, crop=(100, 100), crop_pos_x=150/300, crop_pos_y=150/300, device="gpu")
    cropped_resized_image = fn.resize(cropped_image, resize_x=224, resize_y=224, device="gpu")

    # Normalize images
    mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]  # DALI uses 0-255 scale for images
    std = [0.229 * 255, 0.224 * 255, 0.225 * 255]

    ret = fn.crop_mirror_normalize(cropped_resized_image,
                                    mean=mean,
                                    std=std,
                                    output_dtype=types.FLOAT,
                                    device="gpu")

    return cropped_resized_image, ret

pipe = simple_pipeline(batch_size=4, num_threads=3, device_id=0)
pipe.build()

pipe2 = dali_resnet_pipeline(batch_size=4, num_threads=1, device_id=0)
pipe2.build()

_images, _resized_images = pipe.run()

pipe2.feed_input("pipe2_input", _resized_images)
_, resnet_input_images = pipe2.run()
...

If I open the debug mode in dali_resnet_pipeline, it works ok to me and I can print the input:

DataNodeDebug(
    name="pipe2_input",
    data=TensorListGPU(
             ...,
             dtype=DALIDataType.UINT8,
             layout="HWC",
             num_samples=4,
             shape=[(300, 300, 3), (300, 300, 3), (300, 300, 3), (300, 300, 3)]))
JanuszL commented 11 months ago

Hi @husterdjx,

As DALI provides asynchronous prefetching you need to either turn this off exec_async in the second pipeline or call feed_input prefetch_queue_depth + 1 number of times ahead of the first run of the second pipeline (more details here).

husterdjx commented 11 months ago

Thanks for your help! @JanuszL I have solve this problem. However, when I count the time cost of this two pipeline, the first pipeline takes 400ms+, the second takes less than 10ms to complete.

@pipeline_def(device_id=device_id)
def simple_pipeline(resize_flag=True, resize_x=300, resize_y=300):
    pngs, labels = fn.readers.file(file_root='./images',
                                    random_shuffle=True,
                                    file_filters="*.png")
    images = fn.decoders.image(pngs, device="mixed", output_type=types.RGB)

    # do preprocess here

    # Resize images
    if resize_flag:
        resized_images = fn.resize(images, resize_x=resize_x, resize_y=resize_y, device="gpu")

    # # Normalize images
    mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]  # DALI uses 0-255 scale for images
    std = [0.229 * 255, 0.224 * 255, 0.225 * 255]

    normalized_images = fn.crop_mirror_normalize(resized_images,
                                    mean=mean,
                                    std=std,
                                    output_dtype=types.FLOAT,
                                    device="gpu")

    return normalized_images, resized_images
...
pipe = simple_pipeline(batch_size=4, num_threads=8, device_id=device_id)
pipe.build()

pipe2 = dali_resnet_pipeline(batch_size=4, num_threads=8, device_id=device_id)
pipe2.build()

time1 = time.time()
_images, resized_images = pipe.run()
time2 = time.time()
torch.cuda.synchronize()
print("[EXPR]DALI first pipeline time: ", ((time2 - time1) * 1000), "ms")  # more than 400ms

I suspect there may be a DALI initialization or cold start here? What can I do to avoid it?

Thanks again to you!

JanuszL commented 11 months ago

Hi @husterdjx,

There are multiple factors contributing to startup time, like GPU initialization, memory allocations, etc. What you can do is create the pipeline for the first time (using some test data), destroy it, and then recreate it again for the data you intend to consume. Still, the DALI pipeline is not meant for rapid creation and if you want to change data dynamically during the run please use the external source operator.