Closed shanesyy-1992 closed 3 years ago
Hi,
Could you provide a full self-contained script that would reproduce the problem?
Also, I see that you create this custom stream inside the DALI pipeline. You should rather create it outside and use it for all operations on the data you are using to feed DALI. Otherwise, the data manipulation outside DALI would happen on the PyTorch default stream and the copy on this custom one, so you can get unexpected results due to overlapping operations on different streams.
If DALI internal stream is used feed_input
will block until the transfer is done since there's no way to synchronize with this stream to prevent overwriting the array with new data in another stream
.
Another reason that comes to my mind why you see a lack of overlap might be the blockade on the H2D transfer. ExternalSource uses H2D as well as the DALI GPU operators (to transfer the argument value to the kernels), so until the ExternalSource transfer is done the GPU operators would wait. You can set use_copy_kernel
to true and see if that helps (it replaces cudaMemCopy by copy kernel and unblocks DMA engine that is needed by the regular GPU ops).
Thanks a lot for your detailed response! I tried to create the stream outside of pipeline but it still doesn't work. The use_copy_kernel
seems not work neither. I'll try to create a script for reproduce.
By the way, I'm wondering what will I get from the nsys profiling result if I set use_copy_kernel
to True. I thought I should see some OPs for data transfer, but instead cudaMemcpyAsync is still used here.
If you provide the data in CPU, non page locked (non pinned) memory then use_copy_kernel
may not work.
When you provide the repro script I would be able to tell more.
Hi, sorry for the late reply. Our script looks like follows. Sorry that I cannot provide the full code and data for reproduce. Our preprocess covers the image Decode, Resize, Rotate and Padding. For some reason, we decode and resize the image on CPU, and then use DALI to do the rotate and padding. Padding is done by using Crop and Shift. The training model is a modified ResNet, which is not shown here. And I tried to add customized stream in ExternalSource for transferring image from CPU to GPU, but no overlaps happened. Is there any place we used DALI wrong? Any suggestion is welcomed!
class MyPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id, data_iter, image_shape):
super(MyPipeline, self).__init__(batch_size=batch_size,
num_threads=num_threads,
device_id=device_id)
self.data_iter = data_iter
self.dst_h, self.dst_w = image_shape
self.input_images = ops.ExternalSource(device="gpu")
self.input_rotate_angle = ops.ExternalSource()
self.input_shift_matrix = ops.ExternalSource()
# for rotate
self.rotate = ops.Rotate(device="gpu", fill_value=0.0)
# for padding
self.pad_crop = ops.Crop(device="gpu", crop_w=self.dst_w, crop_h=self.dst_h, crop_pos_x=0, crop_pos_y=0, out_of_bounds_policy="pad")
self.pad_shift = ops.WarpAffine(device="gpu", fill_value=0.0, size=image_shape)
# cast to torch tensor
self.transpose = ops.Transpose(device="gpu", perm=[2,0,1])
self.gpu_cast = ops.Cast(device="gpu", dtype=types.FLOAT)
self.color_cvt = ops.ColorSpaceConversion(device="gpu", image_type=types.BGR, output_type=types.RGB)
def define_graph(self):
self.images = self.input_images()
self.rotate_angles = self.input_rotate_angle()
self.shift_matrixs = self.input_shift_matrix()
images = self.rotate(self.images, angle=self.rotate_angles)
images = self.pad_crop(images)
images = self.pad_shift(images, matrix=self.shift_matrixs)
# from BGR to RGB. Since convert color taks HWC format data, we need to convert color first then do the transpose
images = self.color_cvt(images)
images = self.transpose(images)
images = self.gpu_cast(images)
images = images / 255
return images
def iter_setup(self):
try:
data = next(self.data_iter)
images = data['image']
rotate_angles = data["rotate"]
shift_matrixs = data["shift"]
self.feed_input(self.images, images)
self.feed_input(self.rotate_angles, rotate_angles)
self.feed_input(self.shift_matrixs, shift_matrixs)
except StopIteration:
raise StopIteration
# The DistLineReadingDataset is just used to read from a file line by line
class DaliTestDataset(DistLineReadingDataset):
def __init__(self, all_files, image_shape, rank=0, world_size=1,
**kwargs):
is_hdfs = all_files[0].startswith('hdfs')
self.image_shape = image_shape
super(DaliTestDataset, self).__init__(all_files=all_files, is_hdfs=is_hdfs,
shuffle=False, shard=(rank, world_size))
def __iter__(self):
# self.generate returns each line read from a file
for example in self.generate():
try:
if example:
yield self.__transform_(example)
except Exception as e:
logging.error(traceback.format_exc())
logging.error('encounter broken data: %s' % e)
def __transform_(self, example):
projected_example = json.loads(example)
# decode image
raw_img = self.load_image(projected_example['image_content'])
# get resize, rotate and padding params
img_resize_shape, img_trans_param = self.get_process_param(raw_img)
# resize image on CPU
img = cv2.resize(raw_img, dsize=img_resize_shape)
res = {
'image':img,
'image_trans_param': img_trans_param,
}
return res
def b64_decode(self, string):
if isinstance(string, str):
string = string.encode()
return base64.decodebytes(string)
def load_image(self, img_bin):
img_bin = self.b64_decode(img_bin)
img_data = np.frombuffer(img_bin, dtype=np.uint8)
img = cv2.imdecode(img_data, cv2.IMREAD_COLOR)
return img
# calculate the preprocess parameters.
def get_process_param(self, image):
h, w = image.shape[:2]
rotate_angle = 0.0
dst_h, dst_w = self.image_shape
# rotate when height < width
if h < w:
rotate_angle = -90.0
dst_h, dst_w = dst_w, dst_h
# calculate shift deltas for padding
ratio_src = w / h
ratio_dst = dst_w / dst_h
dx, dy = 0, 0
if ratio_src >= ratio_dst:
tmp_h = math.floor((dst_w / w) * h)
tmp_w = dst_w
dy = (dst_h - tmp_h) // 2
else:
tmp_w = math.floor((dst_h / h) * w)
tmp_h = dst_h
dx = (dst_w - tmp_w) // 2
if rotate_angle != 0.0:
dx, dy = dy, dx
shift_matrix = np.array([[1,0,dx],[0,1,dy]])
return (tmp_w, tmp_h), (rotate_angle, shift_matrix)
# create a batch of data
def collect_fn(self, data):
image = []
gid = []
rotate_angles = []
shift_matrixs = []
for i, ibatch in enumerate(data):
image.append(ibatch['image'])
rotate_angles.append(ibatch['image_trans_param'][0])
shift_matrixs.append(ibatch['image_trans_param'][1])
result = {
'image':image,
'rotate': np.array(rotate_angles, dtype=np.float32),
'shift': np.array(shift_matrixs, dtype=np.float32),
}
return result
data_path = "xxxx"
batch_size = 32
target_width = 360
target_height = 480
num_worker = 8
all_files = hlist_files([data_path])
dataset = DaliTestDataset(all_files, image_shape=(target_height, target_width))
dataloader = DataLoader(dataset, batch_size=batch_size, num_workers=num_worker, collate_fn=dataset.collect_fn)
pipe = MyPipeline(batch_size=batch_size, num_threads=4, device_id=0,
data_iter=iter(dataloader),
image_shape=(target_height, target_width))
pipe.build()
dali_iter = DALIGenericIterator(pipelines=pipe,
output_map=['image'],
size=-1,
auto_reset=True,
dynamic_shape=True)
I was asking not necessarily for your production code, but the code that would illustrate your problem (like get data on the CPU, use ExternalSource, run GPU pipeline) and is self-contained so we can run and get the same results as you do. Getting back to the clue of the problem, is it possible to share an image from the profiler? As I understand the lack of overlap is observable between DALI and the training? O between the call to the external source and DALI GPU code execution?
Absolutely! The image below shows the profiling result. The lack of overlap is the one between the data transfer (shown in red rectangle) and the model training computation. The DALI GPU execution is overlapped with the training part, which is quite nice. I'd like to put the data transfer on another stream, but as we can see from the image, it's still on the default one.
Hi, Looking into the code and profile I don't know if there is much you can do. The problem is with the fact you are transferring data from the CPU to the GPU using non-page locked (pinned memory). Such copy is always synchronous and DALI will wait with the scheduling the next batch of data processing for it. However is it not bad as you already have one batch of data ready ahead of time. As you see in your profile you can train on one batch while the next one, uploaded using ExternalSource is being processed. What you can try to do is:
ops.ExternalSource(device="gpu")
with ops.ExternalSource(device="cpu") self.images = self.input_images().gpu()
and let the transfer H2D happen during the pipeline execution (you need to check if it yields any benefit in your case), and the only thing you are waiting for is H2H copy which should be much fasterThanks a lot for your suggestions! I tried the ops.ExternalSource(device="cpu") self.images = self.input_images().gpu()
and it worked! The CudaMemcpy is now conducted on another stream, instead of the default one. And just a quick question about the CropMirrorNormalize, I wonder would the 'Crop' in CropMirrorNormalize do the padding if the given crop size if larger than the original image size? Like make an (350, 470) image padding to (360, 480). I know the op Crop would do this , but not sure if the same happens in CropMirrorNormalize.
Please check out_of_bounds_policy
argument inside CropMirrorNormalize
operator. It should do what you are asking fore.
Got it, thanks! By the way, another quick question, I wonder is that possible to allow crop_pos_x and crop_pos_y to be negative so that the padding produced by Crop can be around the image, rather than only the right and bottom side. Right now I used a Crop followed by a WarpAffine (just shift) to conduct a padding on the image to make it center. Or is there any other recommended way to do so with DALI? Many thanks!
My bad, the question seems silly. The actual position is calculated based on crop_pos_x and crop_pos_y. I think I know how to do it now. Thanks a lot for your help!
@shanesyy-1992 I have few remarks:
I hope this helps.
@shanesyy-1992 I have few remarks:
- Crop already centers the window by default, so if your crop dimensions are bigger than the image you'll end up with a padded image, where the original image is centered.
- Crop doesn't accept negative anchor coordinates but Slice does. Crop arguments crop_pos_x and crop_pos_y are defined in such a way so that 0.0 means "aligned to the left (or top)" and 1.0 means "aligned to the right (or bottom)" while 0.5 meaning "centered". Slice, however, accepts actual coordinates (either relative or absolute). In case you want your anchor to be at a particular offset, you could use Slice operator.
I hope this helps.
Very valuable information! Thanks a lot!
Hi, thanks for the great work! I'm now using DALI to do preprocess for my training model, and I want to overlap the data transfer from CPU to GPU with GPU computation. I tried to use a customized stream, instead of the default one, for feed_input of pipeline. But after profiling with Nsight System, the HtoD memcpy still happens on the default stream. I'm not sure if I did it correctly. The pipeline looks like follows, and I'm using PyTorch:
I also tried to set cuda_stream = -1 to test the DALI internal stream, but with the profiling result from Nsight System, the data transfer still keeps on the default stream. Any help is appreciated, thanks a lot in advance.