NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.15k stars 622 forks source link

question:the different between DALIGenericIterator and DALICOCOIterator #1215

Closed ZJU-lishuang closed 4 years ago

ZJU-lishuang commented 5 years ago

DALICOCOIterator https://github.com/NVIDIA/DeepLearningExamples/blob/master/PyTorch/Detection/SSD/src/coco_pipeline.py#L196 DALIGenericIterator https://github.com/NVIDIA/DALI/blob/master/dali/python/nvidia/dali/plugin/pytorch.py#L165

Why here need synchronization? I wonder about that. Can you help me?

awolant commented 5 years ago

Hello, thanks for the question. I think that the synchronization in the first link is a leftover from some previous version. I'm not aware of any current issues with the version without it. In general, please use DALIGenericIterator from our repository. It is actively tested and maintained.

ZJU-lishuang commented 5 years ago

I have used DALIGenericIterator for my work.But I find the function don't support the labels and bboxes of different dimensions (I use COCOReader operator).So I overwrite it according to the DALICOCOIterator,which is the question comes from.

May I ask you a question? To solve this problem,Is there a good idea?

I just write a poor code for my work.It works but I am not satisfied with that. When the outputmap is changed,I have to change the Iterator code too.

awolant commented 5 years ago

DALIGenericIterator supports dynamic_shape parameter now. It should be possible to use it to return tensors with different shape every iteration like boxes and labels in detection scenarios.

ZJU-lishuang commented 5 years ago

I think there is a bug in DALIGenericIterator . When I use DALIGenericIterator function. The error is follows. category_tensors[category] = out.as_tensor() RuntimeError: [/opt/dali/dali/pipeline/data/tensor_list.h:386] Assert on "this->IsDenseTensor()" failed: All tensors in the input TensorList must have the same shape and be densely packed.

I think as_tensor() can't be used with different shape. And here is the reason.

Do you agree with me?

ZJU-lishuang commented 5 years ago

I have tested DALICOCOIterator and DALIGenericIterator with other same . 'DALICOCOIterator' works fine. And 'DALIGenericIterator' can't work as expected.

awolant commented 5 years ago

You are right. There is an issue with DALIGenericIterator in this context. Thanks for finding it.

ZJU-lishuang commented 5 years ago

May you plan to fix it currently?

awolant commented 5 years ago

I've look a bit deeper into this issue and from the code it seems, that current implementation of DALIGenericIterator was never suppose to work in this context. It relies heavily on the assumption, thtat TensorLists returned from the pipeline are dense -> all of the samples have the same shape. dynamic_shape gives the ability to change sample shape from iteration to iteration, but still all of the samples in the batch must have the same shape. DALICOCOIterator is a castom code that was written to specifically alleviate this constraint for detection data. It copies samples one by one. This way they don't have to have the same shape. If you need this capability it's ok to follow approach like int DALICOCOIterator. To sum up, from the DALI perspective it is enhancement rather then a bug. Tracked internalli as DALI-1037 If you are willing to contribute, we would be happy to review and merge your PR.

ZJU-lishuang commented 5 years ago

I think each sample usually has a different number of labels and bboxes. So if we have used 'COCOReader',we can't use 'DALIGenericIterator' at most time. Do you agree with me?

awolant commented 5 years ago

Yes, it won't go together like that. COCOReader and other detection related operators were developed to support SSD network. It uses box encoding, so the resulting output has constant size for every sample. That's why this feature we are discussing was not needed. As you may notice, doing it with a single copy is faster. That's why it was done this way in the first place. That said, it's definitely a desirable feature that would make iterators more versatile.

ZJU-lishuang commented 5 years ago

en,what do you mean about "doing it with a single copy is faster"? Does It mean to set dynamic_shape=True in 'DALIGenericIterator' and batch_size=1 in 'COCOReader'? Can box encoding be used in other detection networks which just need ground truth boxes ?

awolant commented 5 years ago

Current implementation of the iterator copies all of the samples using one copy call as they are contiguous in the memory. This is faster then issuing copy for every sample. And separate copy for every sample is necessary when you want to have samples with different shapes in the batch.

You can set dynamic_shape=True and batch_size=1 to bypass this whole issue. Please keep in mind, that processing batch this small might cause inferior performance.

Can box encoding be used in other detection networks which just need ground truth boxes

BoxEncoder we have implemented is something rather specific to SSD. You can read more about it in the SSD paper. This encoding will not work as it is, if you need ground truth boxes.

ZJU-lishuang commented 5 years ago

I just write a simple DALI iterator for pyTorch.It works for me.But the outputs are not friendly.It's not perfect. Is it useful for you?

class PyTorchIterator(object):
    """
    General DALI iterator for pyTorch. It can return any number of
    outputs from the DALI pipeline in the form of pyTorch's Tensors.

    Parameters
    ----------
    pipelines : list of nvidia.dali.pipeline.Pipeline
                List of pipelines to use
    output_map : list of str
                 List of strings which maps consecutive outputs
                 of DALI pipelines to user specified name.
                 Outputs will be returned from iterator as dictionary
                 of those names.
                 Each name should be distinct
    size : int
           Number of samples in the epoch (Usually the size of the dataset).
    auto_reset : bool, optional, default = False
                 Whether the iterator resets itself for the next epoch
                 or it requires reset() to be called separately.
    stop_at_epoch : bool, optional, default = False
                 Whether to return a fraction of a full batch of data
                 such that the total entries returned by the
                 iterator == 'size'. Setting this flag to False will
                 cause the iterator to return the first integer multiple
                 of self._num_gpus * self.batch_size which exceeds 'size'.
    dynamic_shape: bool, optional, default = False
                 Whether the shape of the output of the DALI pipeline can
                 change during execution. If True, the pytorch tensor will be resized accordingly
                 if the shape of DALI returned tensors changes during execution.
                 If False, the iterator will fail in case of change.
    """
    def __init__(self,
                 pipelines,
                 output_map,
                 size,
                 auto_reset=False,
                 stop_at_epoch=False,
                 dynamic_shape=False):
        if not isinstance(pipelines, list):
            pipelines = [pipelines]
        self._num_gpus = len(pipelines)
        assert pipelines is not None, "Number of provided pipelines has to be at least 1"
        self.batch_size = pipelines[0].batch_size
        self._size = int(size)
        self._auto_reset = auto_reset
        self._stop_at_epoch = stop_at_epoch
        self._dynamic_shape = dynamic_shape
        self._pipes = pipelines
        # Build all pipelines
        for p in self._pipes:
            p.build()
        # Use double-buffering of data batches
        self._data_batches = [[None, None] for i in range(self._num_gpus)]
        self._counter = 0
        self._current_data_batch = 0
        assert len(set(output_map)) == len(output_map), "output_map names should be distinct"
        self._output_categories = set(output_map)
        self.output_map = output_map

        # We need data about the batches (like shape information),
        # so we need to run a single batch as part of setup to get that info
        for p in self._pipes:
            p.schedule_run()
        self._first_batch = None
        self._first_batch = self.next()

    def __next__(self):
        if self._first_batch is not None:
            batch = self._first_batch
            self._first_batch = None
            return batch
        if self._counter >= self._size:
            if self._auto_reset:
                self.reset()
            raise StopIteration
        # Gather outputs
        outputs = []
        for p in self._pipes:
            outputs.append(p.share_outputs())
        for i in range(self._num_gpus):
            dev_id = self._pipes[i].device_id
            # initialize dict for all output categories
            category_outputs = dict()
            # segregate outputs into categories
            for j, out in enumerate(outputs[i]):
                category_outputs[self.output_map[j]] = out

            # Change DALI TensorLists into Tensors
            category_tensors = dict()
            category_shapes = dict()

            for category, out in category_outputs.items():
                if category not in category_tensors:
                    category_shapes[category] = []
                    category_tensors[category] = []
                category_tensors[category].append(out)
                j = len(category_tensors[category]) - 1
                category_shapes[category].append([])
                for k in range(len(category_tensors[category][j])):
                    category_shapes[category][j].append(category_tensors[category][j].at(k).shape())

            # If we did not yet allocate memory for that batch, do it now
            # TODO : 迭代器中申请空间改为申请最大空间,而不是每次变动都更新
            if self._data_batches[i][self._current_data_batch] is None:
                category_torch_type = dict()
                category_device = dict()
                torch_gpu_device = torch.device('cuda', dev_id)
                torch_cpu_device = torch.device('cpu')
                # check category and device
                for category in self._output_categories:
                    category_torch_type[category] = to_torch_type[np.dtype(category_tensors[category][0].at(0).dtype())]
                    from nvidia.dali.backend import TensorListGPU
                    if type(category_tensors[category][0]) is TensorListGPU:
                        category_device[category] = torch_gpu_device
                    else:
                        category_device[category] = torch_cpu_device

                pyt_tensors = dict()
                for category in self._output_categories:
                    pyt_tensors[category] = [[torch.zeros(shape,
                                                          dtype=category_torch_type[category],
                                                          device=category_device[category]) for shape in shape_list]
                                             for shape_list in category_shapes[category]]

                self._data_batches[i][self._current_data_batch] = pyt_tensors
            else:
                pyt_tensors = self._data_batches[i][self._current_data_batch]

            # Copy data from DALI Tensors to torch tensors
            for category, tensor in category_tensors.items():
                if self._dynamic_shape and [[list(pyt_tensors[category][j][k].shape)
                        for k in range(len(pyt_tensors[category][j]))]
                        for j in range(len(pyt_tensors[category]))] != category_shapes[category]:#动态更新尺寸
                    pyt_tensors[category] = [[torch.zeros(shape,
                                                            dtype=pyt_tensors[category][0][0].dtype,
                                                            device=pyt_tensors[category][0][0].device) for shape in
                                                shape_list]
                                                for shape_list in category_shapes[category]]

                  for j, b_list in enumerate(tensor):
                      for k in range(len(b_list)):
                          if (pyt_tensors[category][j][k].shape[0] != 0):
                              feed_ndarray(b_list.at(k), pyt_tensors[category][j][k])

        for p in self._pipes:
            p.release_outputs()
            p.schedule_run()

        copy_db_index = self._current_data_batch
        # Change index for double buffering
        self._current_data_batch = (self._current_data_batch + 1) % 2
        self._counter += self._num_gpus * self.batch_size

        if (self._stop_at_epoch) and (self._counter > self._size):
            # First calculate how much data is required to return exactly self._size entries.
            diff = self._num_gpus * self.batch_size - (self._counter - self._size)
            # Figure out how many GPUs to grab from.
            numGPUs_tograb = int(np.ceil(diff/self.batch_size))
            # Figure out how many results to grab from the last GPU (as a fractional GPU batch may be required to
            # bring us right up to self._size).
            mod_diff = diff % self.batch_size
            data_fromlastGPU = mod_diff if mod_diff else self.batch_size

            # Grab the relevant data.
            # 1) Grab everything from the relevant GPUs.
            # 2) Grab the right data from the last GPU.
            # 3) Append data together correctly and return.
            output = [db[copy_db_index] for db in self._data_batches[0:numGPUs_tograb]]
            output[-1] = output[-1].copy()
            for category in self._output_categories:
                output[-1][category] = output[-1][category][0:data_fromlastGPU]
            return output

        return [db[copy_db_index] for db in self._data_batches]

    def next(self):
        """
        Returns the next batch of data.
        """
        return self.__next__()

    def __iter__(self):
        return self

    def reset(self):
        """
        Resets the iterator after the full epoch.
        DALI iterators do not support resetting before the end of the epoch
        and will ignore such request.
        """
        if self._counter >= self._size:
            if self._stop_at_epoch:
                self._counter = 0
            else:
               self._counter = self._counter % self._size
            for p in self._pipes:
                p.reset()
        else:
            logging.warning("DALI iterator does not support resetting while epoch is not finished. Ignoring...")
YunseokJANG commented 4 years ago

Hi, I have a question related to this.

My current pipeline output has a

Thus, currently, I get

Assert on "this->IsDenseTensor()" failed: All tensors in the input TensorList must have the same shape and be densely packed.

this error, as expected.

In this case, is there a new iterator to iterate the data? Or, does the code on the very last answer the recommended solution, as of now?

JanuszL commented 4 years ago

Hi @YunseokJANG, You can use recently introduced pad operator (but still you need to figure out where data ends and padding starts) or use the very last answer.

YunseokJANG commented 4 years ago

Hi @JanuszL , thanks for your kind reply. This API look is close to what I am looking for.

By the way, I have one additional question related to this. If I want to manipulate the data before feeding it to the model (adding StartOfSentence or EndOfSentence token in NLP, for instance), then do you recommend (or this framework prefers) to manipulate the data format within the Pipeline class (maybe with ops.PythonFunction ( eg1, eg2 ), or implement outside of the iterator? I am slightly biased to the former approach, but I'd like to hear the best practices ;)

Regards, Yunseok

JanuszL commented 4 years ago

@YunseokJANG - using the PythonFunction strips your pipeline from asynchronous and pipelines execution, also calling back to python code from the Pipeline imposes some overhead as well. This feature is mainly for prototyping and debugging, not for obtaining performance in the production environment. Doing it outside of the iterator would be probably faster, still, you can try both approaches (it should not be that difficult to change one approach to the other once you have the needed operation implemented).

YunseokJANG commented 4 years ago

Oh, I see. Thanks for your comments :)