NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.03k stars 613 forks source link

A different external Source for each run() #2292

Closed dariodematties closed 3 years ago

dariodematties commented 3 years ago

Hi,

In a recent question I made to the support I learnt that I can use ExternalSourse for instance to feed a controlled cropping process for each element in a batch of images.

For example, I can pass different and determined centers to crop each image in a batch

Yet, following the example in here I see that I can only pass the arguments to the constructor that defines the graph. Consequently, the same crop centers will be used for each run() of the graph.

The fact is that I do not have all the crop centers for all the dataset in advance. Instead, I will be provided with a new batch of centers after each run. Therefore, I cannot pass all the centers for all the dataset to the constructor.

How can I pass a different batch of centers in each run as we obtain them? Can I do that using DALI?

I can provide my code if needed

Thanks!

klecki commented 3 years ago

Hi, you can pass either a function or an iterable as the source argument of the ExternalSource Operator. The function can:

More details in the doc of the operator here: https://docs.nvidia.com/deeplearning/dali/master-user-guide/docs/supported_ops.html#nvidia.dali.ops.ExternalSource

So you can have a function like:

def generate_x(iter_id):
  [np.int32(iter_id * 10 % 200)] * batch_size

to drive the position of the centers by the iteration index. You can also write an iterator that will generate different centers every time it's next is called, or, I think, generate list of some centers and cycle over them.

dariodematties commented 3 years ago

Thank you for your response @klecki , I imagine there must be a way to provide different data to the run() function at each iteration but I cannot find the right way to do it.

This is the way I am introducing external data in the pipeline:

class ExternalInputIterator(object):
    def __init__(self, batch_size, vector):
        self.vector = vector
        self.batch_size = batch_size

    def __iter__(self):
        self.i = 0
        self.n = len(self.vector)
        return self

    def __next__(self):
        batch = []
        for _ in range(self.batch_size):
            element = self.vector[self.i]
            batch.append(element)
            self.i = (self.i + 1) % self.n
        return batch

class FoveatedRetinalProcessor(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus, crop_pos_x, crop_pos_y):
        super(FoveatedRetinalProcessor, self).__init__(batch_size, num_threads, device_id, seed = 15)
        self.input = ops.COCOReader(file_root = file_root, annotations_file = annotations_file,
                                     shard_id = device_id, num_shards = num_gpus, ratio=True, random_shuffle=False)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)

        self.resize_zero = ops.Resize(device = "gpu", resize_x = 640, resize_y = 640)
        self.resize_one  = ops.Resize(device = "gpu", resize_x = 30, resize_y = 30)

        self.crop_zero  = ops.Crop(device = "gpu", crop_h = 640, crop_w = 640)
        self.crop_one   = ops.Crop(device = "gpu", crop_h = 400, crop_w = 400)
        self.crop_two   = ops.Crop(device = "gpu", crop_h = 240, crop_w = 240)
        self.crop_three = ops.Crop(device = "gpu", crop_h = 100, crop_w = 100)
        self.crop_four  = ops.Crop(device = "gpu", crop_h = 30, crop_w = 30)

        self.source_crop_pos_x = ops.ExternalSource(source = crop_pos_x)
        self.source_crop_pos_y = ops.ExternalSource(source = crop_pos_y)

    def define_graph(self):
        inputs, bboxes, labels = self.input()
        images   = self.resize_zero(self.decode(inputs))

        crop_pos_x = self.source_crop_pos_x()
        crop_pos_y = self.source_crop_pos_y()

        cropped0 = self.crop_zero(images)
        cropped1 = self.crop_one(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped2 = self.crop_two(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped3 = self.crop_three(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped4 = self.crop_four(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)

        sized0   = self.resize_one(cropped0)
        sized1   = self.resize_one(cropped1)
        sized2   = self.resize_one(cropped2)
        sized3   = self.resize_one(cropped3)
        sized4   = self.resize_one(cropped4)

        return (cropped0, cropped1, cropped2, cropped3, cropped4, sized0, sized1, sized2, sized3, sized4)

And in a Jupyter notebook first I create the pipes:

crop_pos_x = NDP.ExternalInputIterator(batch_size, torch.rand(batch_size))
crop_pos_y = NDP.ExternalInputIterator(batch_size, torch.rand(batch_size))

start = time()
pipes = [NDP.FoveatedRetinalProcessor(batch_size=batch_size, num_threads=2, device_id=device_id, num_gpus=num_gpus, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)  for device_id in range(num_gpus)]
for pipe in pipes:
    pipe.build()
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

And finally, I run the pipes:

start = time()
pipe_out = [pipe.run() for pipe in pipes]
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

crop_images_cpu0 = pipe_out[0][0].as_cpu()
crop_images_cpu1 = pipe_out[0][1].as_cpu()
crop_images_cpu2 = pipe_out[0][2].as_cpu()
crop_images_cpu3 = pipe_out[0][3].as_cpu()
crop_images_cpu4 = pipe_out[0][4].as_cpu()

sized_images_cpu0 = pipe_out[0][5].as_cpu()
sized_images_cpu1 = pipe_out[0][6].as_cpu()
sized_images_cpu2 = pipe_out[0][7].as_cpu()
sized_images_cpu3 = pipe_out[0][8].as_cpu()
sized_images_cpu4 = pipe_out[0][9].as_cpu()

And everything run smoothly, but the problem is that each time I call pipe.run() it uses the same batch of centers; the ones I generated just before constructing the pipes:

crop_pos_x = NDP.ExternalInputIterator(batch_size, torch.rand(batch_size))
crop_pos_y = NDP.ExternalInputIterator(batch_size, torch.rand(batch_size))

What is the right way of passing a different batch of centers to pipe.run() each time? Take into account that I do not have all the batches of centers in advance before constructing the pipes. I obtain a new batch of centers after each pipe.run()

Thank you in advance and excuse my large question

klecki commented 3 years ago

You create two iterators, that store the batch of random numbers generated by torch.rand(batch_size) as self.vector. For every run() DALI calls __next__ on your iterator and it copies the same vector to the output.

Something like this should work, I think we can handle the PyTorch tensors, the gist of it is, you need to generate new numbers for every __next__.

def __next__(self):
        return torch.rand(batch_size)

If you want just the random numbers I advise to swap that ExternalSource to Uniform Operator. Just specify the proper range and pass it's output as crop_pos_x and crop_pos_y.

dariodematties commented 3 years ago

Thank you @klecki, I see your point. The problem is that this is just a test. That's why I use random numbers in the vectors.

In a real scenario such vectors would be not random, and I will need a way to introduce them in some way inside the pipeline for each iteration on pipe.run()

Can you get my problem?

When I run pipe.run() the first time it brings the first batch (let's assume batch_size = 16) from NSCOCO (that is from 0 to 15) and uses the 16 centers I provided to the constructor.

Then I run pipe.run() again, and it will bring coco images from 16 to 32, but it will use the same 16 centers in the cropping operations, and so on.

Suppose I get the 16 centers from another process or network after each pipe.run() and I want to use such centers in each iteration in the next pipe.run()

How can I do that?

klecki commented 3 years ago

If you want to have some independent call to pass the data to ExternalSource, you probably should be able to use feed_input. Beware that the pipeline does some prefetching (by default if you call run() it starts 2 iterations, so you need to feed it ahead 2 times as well). I will check tomorrow if it's enough, I usually just generate the data on the fly in the callback I pass to source.

JanuszL commented 3 years ago

@dariodematties - are your crop parameters dependent on the data you get from COCOReader operator? If so I would recommend creating a custom operator that would output them based on the input from the COCO reader. If not, and you want to obtain them from an external (independent) source ou can just add proper handling of this communication channel inside the __next__ method in ExternalInputIterator. It will just make the DALI pipeline waiting for the data you are supposed to get.

JanuszL commented 3 years ago

@dariodematties if you could provide an example of how you would obtain the data you want to pass to the pipeline we can propose a solution that may suit you.

klecki commented 3 years ago

Adding to that, the feed_input would queue the data for the operator (as a synchronous call, at least for CPU data).

As I said, with Pipeline set to async and pipelined (those are default options), the run() will schedule more than one iteration to fill the prefetch queue. So you need to provided similar queue of inputs ahead of time for the external sources. If you pass your callback as the source parameter, it would be called as many times as needed, with feed_input you need to do it manually.

You can probably call feed_input prefetch_queue_depth-times before the first run(), and add do additional call to feed_input before every run().

dariodematties commented 3 years ago

Dear @JanuszL it is really kind from you. Thank you, I think it would be really useful for me if you could post a code example here.

In the meantime, I read your comment on establishing a communication channel inside next and I think it is working to some extension I used a somehow unorthodox strategy though I used global variables inside my custom library This is the code:

global crop_pos_x
global crop_pos_y

class ExternalInputIterator(object):
    def __init__(self, batch_size):
        self.batch_size = batch_size

    def _get_vectors(self):
        self.vector1 = crop_pos_x
        self.vector2 = crop_pos_y

    def __iter__(self):
        self._get_vectors()
        assert len(self.vector1) == len(self.vector2)
        self.i = 0
        self.n = len(self.vector1)
        return self

    def __next__(self):
        batch1 = []
        batch2 = []
        self._get_vectors()
        for _ in range(self.batch_size):
            element1 = self.vector1[self.i]
            batch1.append(element1)
            element2 = self.vector2[self.i]
            batch2.append(element2)
            self.i = (self.i + 1) % self.n
        return batch1, batch2

class FoveatedRetinalProcessor(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus, crop_pos):
        super(FoveatedRetinalProcessor, self).__init__(batch_size, num_threads, device_id, seed = 15)
        self.input = ops.COCOReader(file_root = file_root, annotations_file = annotations_file,
                                     shard_id = device_id, num_shards = num_gpus, ratio=True, random_shuffle=False)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)

        self.resize_zero = ops.Resize(device = "gpu", resize_x = 640, resize_y = 640)
        self.resize_one  = ops.Resize(device = "gpu", resize_x = 30, resize_y = 30)

        self.crop_zero  = ops.Crop(device = "gpu", crop_h = 640, crop_w = 640)
        self.crop_one   = ops.Crop(device = "gpu", crop_h = 400, crop_w = 400)
        self.crop_two   = ops.Crop(device = "gpu", crop_h = 240, crop_w = 240)
        self.crop_three = ops.Crop(device = "gpu", crop_h = 100, crop_w = 100)
        self.crop_four  = ops.Crop(device = "gpu", crop_h = 30, crop_w = 30)

        self.source = ops.ExternalSource(source = crop_pos, num_outputs = 2)

    def define_graph(self):
        inputs, bboxes, labels = self.input()
        images   = self.resize_zero(self.decode(inputs))

        crop_pos_x, crop_pos_y = self.source()

        cropped0 = self.crop_zero(images)
        cropped1 = self.crop_one(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped2 = self.crop_two(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped3 = self.crop_three(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped4 = self.crop_four(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)

        sized0   = self.resize_one(cropped0)
        sized1   = self.resize_one(cropped1)
        sized2   = self.resize_one(cropped2)
        sized3   = self.resize_one(cropped3)
        sized4   = self.resize_one(cropped4)

        return (cropped0, cropped1, cropped2, cropped3, cropped4, sized0, sized1, sized2, sized3, sized4)

In the Jupyter notebook

first I construct the pipes

sys.path.append(path + '/Colab Notebooks/Multimodal Active AI/NVIDIA DALI')
import NVIDIA_DALI_Pipelines as NDP

NDP.crop_pos_x = torch.repeat_interleave(torch.Tensor([0]), batch_size)
NDP.crop_pos_y = torch.repeat_interleave(torch.Tensor([0]), batch_size)

crop_pos = NDP.ExternalInputIterator(batch_size)

start = time()
pipes = [NDP.FoveatedRetinalProcessor(batch_size=batch_size, num_threads=2, device_id=device_id, num_gpus=num_gpus, crop_pos=crop_pos)  for device_id in range(num_gpus)]
for pipe in pipes:
    pipe.build()
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

then I run them and for each run I change the global variables for instance from 0 to 1 this is the code in the notebook

NDP.crop_pos_x = torch.repeat_interleave(torch.Tensor([0]), batch_size)
NDP.crop_pos_y = torch.repeat_interleave(torch.Tensor([0]), batch_size)

start = time()
pipe_out = [pipe.run() for pipe in pipes]
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

crop_images_cpu0 = pipe_out[0][0].as_cpu()
crop_images_cpu1 = pipe_out[0][1].as_cpu()
crop_images_cpu2 = pipe_out[0][2].as_cpu()
crop_images_cpu3 = pipe_out[0][3].as_cpu()
crop_images_cpu4 = pipe_out[0][4].as_cpu()

sized_images_cpu0 = pipe_out[0][5].as_cpu()
sized_images_cpu1 = pipe_out[0][6].as_cpu()
sized_images_cpu2 = pipe_out[0][7].as_cpu()
sized_images_cpu3 = pipe_out[0][8].as_cpu()
sized_images_cpu4 = pipe_out[0][9].as_cpu()

But, there is some funny behaviour that I guess have to do with what @klecki mentioned regarding the prefetching phenomenon. The thing is that I have to run the pipe twice in order to see the effect of the new centers in the outputs

Thank you both, @klecki and @JanuszL for all your help. I think it would help me a lot if you could post a good example on how to achieve what I want to in the best possible way.

Regarding to your question @JanuszL, the centers come from the output of another NN could it be a MLP for instance.

And finally regarding your suggestion @klecki about the use of feed_input, honestly I do not understand quite well how such method could help me and I have not been able to find good code examples on the web using it neither

JanuszL commented 3 years ago

Hi @dariodematties,

In your case, it seems you want to run other network, get the centers, and use them to drive DALI. One solution would be to create a thread with the DALI and make __next__ blocking until the new set of centers arrive. Then DALI python side thread would wait for the next batch of data. DALI outputs could be returned using python queue.

Other solution would be to set pipelining and asynchronous execution in DALI pipeline to False, and prefetch_queue_depth to 1, Then there would be no prefetching and you can do as you proposed in your example:

NDP.crop_pos_x = torch.repeat_interleave(torch.TensorX1([0]), batch_size)
NDP.crop_pos_y = torch.repeat_interleave(torch.TensorY1([0]), batch_size)

# DALI would run the pipeline only once
pipe_out = [pipe.run() for pipe in pipes]

# get outputs for X1, Y1

NDP.crop_pos_x = torch.repeat_interleave(torch.TensorX2([0]), batch_size)
NDP.crop_pos_y = torch.repeat_interleave(torch.TensorY2([0]), batch_size)

# DALI would run the pipeline only once
pipe_out = [pipe.run() for pipe in pipes]

# get outputs for X2, Y2

Running network->DALI->network in the pipelined and efficient way would probably require the first approach with threads, but the second idea should allow you to test your workflow.

dariodematties commented 3 years ago

Hi @JanuszL, thank you for all your help. I did as you suggested and it finally worked!

And thank you @klecki!

You both really helped me to get this done Best! Dario