triton-inference-server / dali_backend

The Triton backend that allows running GPU-accelerated data pre-processing pipelines implemented in DALI's python API.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
MIT License
120 stars 28 forks source link

Error in thread 31: nvJPEG error (5): The user-provided allocator functions, for either memory allocation or for releasing the memory, returned a non-zero code. #198

Closed DequanZhu closed 1 year ago

DequanZhu commented 1 year ago

My dali pipeline loaded by triton got such a error log when exec a infer request by triton client: Error in thread 31: nvJPEG error (5): The user-provided allocator functions, for either memory allocation or for releasing the memory, returned a non-zero code.

 below is my pipeline definition:
import nvidia.dali as dali
import nvidia.dali.types as types

BATCH_SIZE = 32
INPUT_SIZE = 640
FILL_VALUE = 114
NUM_THREAD = 128

def parse_args():
    import argparse

    parser = argparse.ArgumentParser(
        description="Serialize the pipeline and save it to a file"
    )
    parser.add_argument(
        "file_path", type=str, help="The path where to save the serialized pipeline"
    )
    return parser.parse_args()

def get_shift(images):
    shapes = dali.fn.peek_image_shape(images)[:2]
    max_size = dali.fn.reductions.max(shapes, axes=[0])
    index = dali.fn.cast((shapes == max_size), dtype=types.INT64)
    min_size = dali.fn.reductions.min(shapes, axes=[0])
    scale_ratio = INPUT_SIZE / max_size
    dst_min_size = dali.fn.cast(min_size * scale_ratio, dtype=types.INT64)
    shift = (INPUT_SIZE - dali.fn.stack(dst_min_size, dst_min_size)) // 2
    shift = shift * index
    scale_ratio = dali.fn.reshape(scale_ratio, shape=[1])
    return shift, scale_ratio

@dali.pipeline_def(batch_size=BATCH_SIZE, num_threads=NUM_THREAD, device_id=0)
def simple_pipeline():
    orig_image_bytes = dali.fn.external_source(device="cpu", name="orig_image_bytes")
    shift, scale_ratio = get_shift(orig_image_bytes)
    shift = dali.fn.cast(shift, dtype=types.FLOAT)
    mt = dali.fn.transforms.translation(offset=shift)
    orig_image_decode = dali.fn.decoders.image(
        orig_image_bytes, device="mixed", output_type=types.RGB
    )
    letterboxed_image = dali.fn.resize(
        orig_image_decode, size=INPUT_SIZE, mode="not_larger"
    )
    letterboxed_image = dali.fn.pad(
        letterboxed_image,
        axis_names="HW",
        shape=[INPUT_SIZE, INPUT_SIZE],
        fill_value=FILL_VALUE,
    )
    letterboxed_image = dali.fn.warp_affine(
        letterboxed_image, matrix=mt, fill_value=FILL_VALUE, inverse_map=False
    )
    detector_input_letterboxed_image = dali.fn.cast(
        letterboxed_image, dtype=types.FLOAT
    )
    detector_input_letterboxed_image = dali.fn.transpose(
        detector_input_letterboxed_image, perm=[2, 0, 1]
    )
    detector_input_letterboxed_image = detector_input_letterboxed_image / 255.0
    letterboxed_image = dali.fn.color_space_conversion(
        letterboxed_image, image_type=types.BGR, output_type=types.RGB
    )    
    return letterboxed_image, detector_input_letterboxed_image, shift, scale_ratio

def main(filename):
    simple_pipeline().serialize(filename=filename)

if __name__ == "__main__":
    args = parse_args()
    main(args.file_path)

It seems like it's caused by dali.fn.decoders.image function using too much gpu memory, but when I exec nvidia-smi, the system's gpu memory is not fully used, so what caused this problem, and how can I evaluate how much gpu memory this function will use when decoded image by gpu, or how much ram will use when decoded by cpu?

JanuszL commented 1 year ago

Hi @DequanZhu,

Thank you for reaching out. Can you tell me if reducing the batch size helps? The error means that you either run out of the GPU or pinned (host side memory), however, it is very hard to determine the amount of memory needed for the decoding as it depends on the image size and the chroma subsampling. Have you also run this pipeline without TRITON?

DequanZhu commented 1 year ago

Hi @JanuszL , thanks for reply, can you tell me what batch_size means in dali pipeline @dali.pipeline_def(batch_size=BATCH_SIZE, num_threads=NUM_THREAD, device_id=0) definition, and is it different from batch size in triton model configpb.txt file max_batch_size donfiguration, if not which will deteminate real batch_size.

JanuszL commented 1 year ago

Hi @DequanZhu,

Have you checked the DALI backend readme file ? As far as I understand, the max_batch_size inside the config.pbtxt file defines the maximum batch size that can be processed in the pipeline, @szalpal please correct me if I'm wrong. Still, depending on the ratio of the requests the number of samples processed in one batch could be smaller.

DequanZhu commented 1 year ago

Hi @JanuszL,actually I'm not so clear about triton dynamic batch mechanism. According to my understanding, dynamic batch is useful for some cnn model, for this model, input shape is usually like [N,height,width,channels]. While this kind of model is loaded by triton, if N input request each shapes [1,height,width,channels] arrived, triton can combine these input into one [N,height,width,channels] input, and model can exec it in one forward pass,so that can improve throughput, if I'm wrong please correct me.But for my pipeline, I think dynamic batch will not works, because the input is one dimension encoded image uint8 sequence which shape is like [-1], each image has different shape, so can't combined by triton into one batch input which shape is like [N,-1].Actually,I test it in debug model:

import nvidia.dali as dali
import nvidia.dali.types as types
import cv2
import numpy as np
from nvidia.dali.pipeline.experimental import pipeline_def

BATCH_SIZE = 32
INPUT_SIZE = 640
FILL_VALUE = 114
NUM_THREAD = 128

def parse_args():
    import argparse

    parser = argparse.ArgumentParser(
        description="Serialize the pipeline and save it to a file"
    )
    parser.add_argument(
        "file_path", type=str, help="The path where to save the serialized pipeline"
    )
    return parser.parse_args()

def input_data():
    image = np.random.randint(0, 256, (3072, 3072, 3), dtype=np.uint8)
    success, buffer = cv2.imencode(".png", image)
    buffer = np.transpose(buffer, axes=[1, 0])
    buffer = buffer[np.newaxis, :]
    buffer = [buffer] * 32
    buffer = np.vstack(buffer)
    return buffer

def get_shift(images):
    shapes = dali.fn.peek_image_shape(images)[:2]
    max_size = dali.fn.reductions.max(shapes, axes=[0])
    index = dali.fn.cast((shapes == max_size), dtype=types.INT64)
    min_size = dali.fn.reductions.min(shapes, axes=[0])
    scale_ratio = INPUT_SIZE / max_size
    dst_min_size = dali.fn.cast(min_size * scale_ratio, dtype=types.INT64)
    shift = (INPUT_SIZE - dali.fn.stack(dst_min_size, dst_min_size)) // 2
    shift = shift * index
    scale_ratio = dali.fn.reshape(scale_ratio, shape=[1])
    return shift, scale_ratio

@pipeline_def(batch_size=BATCH_SIZE, num_threads=NUM_THREAD, device_id=0)
def simple_pipeline():
    orig_image_bytes = dali.fn.external_source(device="cpu", source=input_data(), name="orig_image_bytes")
    print(dali.fn.shapes(orig_image_bytes))
    shift, scale_ratio = get_shift(orig_image_bytes)
    shift = dali.fn.cast(shift, dtype=types.FLOAT)
    mt = dali.fn.transforms.translation(offset=shift)
    orig_image_decode = dali.fn.decoders.image(
        orig_image_bytes, device="mixed", output_type=types.RGB
    )
    letterboxed_image = dali.fn.resize(
        orig_image_decode, size=INPUT_SIZE, mode="not_larger"
    )
    letterboxed_image = dali.fn.pad(
        letterboxed_image,
        axis_names="HW",
        shape=[INPUT_SIZE, INPUT_SIZE],
        fill_value=FILL_VALUE,
    )
    letterboxed_image = dali.fn.warp_affine(
        letterboxed_image, matrix=mt, fill_value=FILL_VALUE, inverse_map=False
    )
    detector_input_letterboxed_image = dali.fn.cast(
        letterboxed_image, dtype=types.FLOAT
    )
    detector_input_letterboxed_image = dali.fn.transpose(
        detector_input_letterboxed_image, perm=[2, 0, 1]
    )
    detector_input_letterboxed_image = detector_input_letterboxed_image / 255.0
    return detector_input_letterboxed_image, shift, scale_ratio

if __name__ == "__main__":
    pipe = simple_pipeline(debug=True)
    pipe.build()
    pipe_out = pipe.run()

I print orig_image_bytes shape,and find batch size is always 1:

DataNodeDebug(
    name="shapes",
    data=TensorListCPU(
             [[28364816]],
             dtype=DALIDataType.INT64,
             num_samples=1,
             shape=[(1,)]))

and I also found that batch size has little impact on the final GPU memory usage, but num_threads has a significant impact. So, is it possible that above error is caused by too large num_threads? How num_threads affect pipeline's performance?

JanuszL commented 1 year ago

Hi @DequanZhu,

DALI creates one GPU image decoder instance per each GPU and this can consume a considerable amount of host and device memory (more than 100MB). 128 I see in your script is rather too much, and usually, 3-5 should do in most cases. Can you check how lowering the number impact the memory consumption and performance in your case?

DequanZhu commented 1 year ago

by reducing the number of threads to 4, no errors occurred, and the performance was not affected.