NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.05k stars 615 forks source link

Multi-label classification pipeline #1933

Open jpnavarro-nv opened 4 years ago

jpnavarro-nv commented 4 years ago

Just to confirm the current best approach of multilabel classification with DALI.

I'm following the instruction from #874 to create a multi-label loader and using the file_list to ingest an ID that points to the multi-hot encoding vector.

It is necessary to implement an ExternalSource to accomplish this? Because, when I try to use the dictionary of <indices, labels> inside define_graph directly, I'm getting the following error:

File "train.py", line 64, in define_graph
    print((images, self.combinations(labels)))
TypeError: 'numpy.ndarray' object is not callable

What am I doing wrong?

My define_graph looks like this:

def define_graph(self):
        inputs, labels = self.reader(name="Reader")
        images = self.decode(inputs)
        if self.device is 'gpu':
            labels = labels.gpu()
        images = self.cmn(images)

        return (
            images,
            self.combinations(labels)
        )
awolant commented 4 years ago

Hi, thanks for the question. Can you share the code for the whole pipeline?

In principle, within define_graph method, you operate on nodes and edges of pipeline graph (like in TensorFlow graph approach), so in your code labels is not containing the values of labels. To get them, you need to build and run your pipeline. ExternalSource lets you add the ability to read externally provided data to the graph.

With full code example, I'll be able to provide more specific help.

jpnavarro-nv commented 4 years ago

Hi @awolant thanks for your reply. Got the graph explanation.

And sure I can share the code. The structure is basically the one proposed by you at this issue #1852 (thanks for that!). The difference in my case is the data origin ('.jpg' images in disk) and type of classification (multi-label).

I'm wondering if the suggestion presented at #874 still valid. Because, if it is necessary to implement my own ExternalSource, we don't need to use the file_list approach proposed at #874. I could manually read the CSV and implement the iteration accordingly.

Anycase, here my CSV structure. Original:

Path,any,class1,class2,class3,class4,class5
/workspace/images_cut_bsb/ID_7226189f1.png,0,0,0,0,0,0
/workspace/images_cut_bsb/ID_7dcf3ba2f.png,0,0,0,0,0,0
/workspace/images_cut_bsb/ID_707ca3182.png,1,0,1,0,0,0`

Adjusted CSV (following #874):

Path,class_index
/workspace/images_cut_bsb/ID_c66808636.png 34
/workspace/images_cut_bsb/ID_028df4a84.png 33
/workspace/images_cut_bsb/ID_c17cacb50.png 0

Here the code. Notice that the define_graph still the same.

import tensorflow as tf
import horovod.tensorflow.keras as hvd
from tensorflow.keras import optimizers
from ingestion_pipeline import get_classes_combinations

tf.compat.v1.disable_eager_execution()

# Horovod: initialize Horovod.
hvd.init()

import nvidia.dali.plugin.tf as dali_tf
import nvidia.dali as dali
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.pipeline import Pipeline

import os

# Path to MNIST dataset
data_path = '/workspace/datasets/train_small'

TARGET = 0.8
BATCH_SIZE = 50
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 3
NUM_GPUS = hvd.local_size()
GLOBAL_BATCH_SIZE = BATCH_SIZE * NUM_GPUS
DATASET_SIZE = 60000
ITERATIONS = DATASET_SIZE // GLOBAL_BATCH_SIZE

# DALI pipeline definition
class MnistPipeline(Pipeline):
    def __init__(self, num_threads, path, device, device_id=0, shard_id=0, num_shards=1, seed=0):
        super(MnistPipeline, self).__init__(
            BATCH_SIZE, num_threads, device_id, seed)
        self.device = device
        self.reader = ops.FileReader(
            file_root=path, random_shuffle=True, shard_id=shard_id, num_shards=num_shards)
        self.decode = ops.ImageDecoder(
            device='mixed' if device is 'gpu' else 'cpu',
            output_type=types.GRAY)
        self.cmn = ops.CropMirrorNormalize(
            device=device,
            output_dtype=types.FLOAT,
            image_type=types.GRAY,
            mean=[0.],
            std=[255.],        
            output_layout="CHW")
        self.combinations = get_classes_combinations()

    def define_graph(self):
        inputs, labels = self.reader(name="Reader")
        images = self.decode(inputs)
        if self.device is 'gpu':
            labels = labels.gpu()
        images = self.cmn(images)

        return (
            images,
            self.combinations(labels)
        )

# Parameters settings
device = 'gpu'

# Parameters for DALI TF DATASET
shapes = (
    (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE),
    (BATCH_SIZE, 1)
    )
dtypes = (
    tf.float32,
    tf.int32
    )

def dataset_options():
    options = tf.data.Options()
    try:
        options.experimental_optimization.apply_default_optimizations = False
        options.experimental_optimization.autotune = False   
    except:
        print('Could not set TF Dataset Options')

    return options

# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

with tf.device('/gpu:0'):
    mnist_pipeline = MnistPipeline(
        4, data_path, device, device_id=hvd.local_rank(), shard_id=hvd.local_rank(), num_shards=hvd.local_size())

    dataset = dali_tf.DALIDataset(
        pipeline=mnist_pipeline,
        batch_size=BATCH_SIZE,
        output_shapes=shapes,
        output_dtypes=dtypes,
        num_threads=4,
        device_id=0)

    dataset = dataset.with_options(dataset_options())

    mnist_model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name='images'),
        tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
        tf.keras.layers.Dense(HIDDEN_SIZE, activation='relu'),
        tf.keras.layers.Dropout(DROPOUT),
        tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
    ])

    # Horovod: adjust learning rate based on number of GPUs.
    opt = optimizers.Adam(0.001 * hvd.size())

    # Horovod: add Horovod DistributedOptimizer.
    opt = hvd.DistributedOptimizer(opt)

    # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
    # uses hvd.DistributedOptimizer() to compute gradients.
    mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                        optimizer=opt,
                        metrics=['accuracy'],
                        experimental_run_tf_function=False)

    callbacks = [
        # Horovod: broadcast initial variable states from rank 0 to all other processes.
        # This is necessary to ensure consistent initialization of all workers when
        # training is started with random weights or restored from a checkpoint.
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),

        # Horovod: average metrics among workers at the end of every epoch.
        #
        # Note: This callback must be in the list before the ReduceLROnPlateau,
        # TensorBoard or other metrics-based callbacks.
        hvd.callbacks.MetricAverageCallback(),

        # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
        # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
        # the first three epochs. See https://arxiv.org/abs/1706.02677 for details.
        hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=3, verbose=1),
    ]

    # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
    # if hvd.rank() == 0:
    #     callbacks.append(tf.keras.callbacks.ModelCheckpoint('/tmp/checkpoint-{epoch}.h5'))

    # Horovod: write logs on worker 0.
    verbose = 0 if hvd.local_rank() > 0 else 1

    # Train the model.
    # Horovod: adjust number of steps based on number of GPUs.
    mnist_model.fit(dataset, steps_per_epoch=ITERATIONS // hvd.size(), callbacks=callbacks, epochs=EPOCHS, verbose=verbose)
awolant commented 4 years ago

Ok, thanks for the code. I don't see implementation of get_classes_combinations(), but unless it returns some DALI op it will not work as I explained before. Unfortunately, ExternalSource does not work with TensorFlow for now, so if you want to use Keras + Horovod that solution is out of reach. As @JanuszL suggested in #874 you can either do this with file_list or transform your dataset to RecordIO or TFRecord and use proper reader. If you want to follow the file_list route, the idea is to return this "id coded as label" from DALI and match labels outside of the DALI pipeline, so your define_graph is simply:

def define_graph(self):
        inputs, labels = self.reader(name="Reader")
        images = self.decode(inputs)
        images = self.cmn(images)

        return (images, labels)

and then you manually match them outside. There is one thing though. I'm not sure how to do this within tf.data API while ensuring that produced dataset outputs are still on the GPU. Otherwise you might lose the performance benefits of using DALI. I'm afraid that any kind of tf.data map call, would pull everything to the CPU (outputs, not the DALI pipeline, but this results in going GPU->CPU->GPU which is usually the performance killer). Maybe you can do something like a bit of custom Keras to do this manually there? This might be a way around this.

Sorry to say that, but for this particular use case, I don't see any clear solution that is ready to use.

awolant commented 4 years ago

One additional question. How many labels per sample do you have in your data? Is this number the same for every sample or differs? I'm asking, because wrappers around DALI generally expect constant shape of outputs, so this would be something to consider as well.

jpnavarro-nv commented 4 years ago

Thanks for your reply @awolant .

The function get_classes_combinations() simply returns all possible vectors for a 6x-way multi-hot encoding:


def get_classes_combinations():
    return np.array(list(itertools.product((0, 1), repeat=6)), dtype=np.int8)

I understand the limitations and sorry to hear that ExternalSource is not available for TF.

About the last question

How many labels per sample do you have in your data?

the number of samples is constant, i.e., we always have a binary (1,6) vector as output.

One last question, converting the dataset to RecordIO or TFRecord will allow me to treat the output as a multi-hot encoding vector? Do you have any example around this?

JanuszL commented 4 years ago

The function get_classes_combinations() simply returns all possible vectors for a 6x-way multi-hot encoding:

This won't work as you cannot do such an operation on the DALI graph edges.

One last question, converting the dataset to RecordIO or TFRecord will allow me to treat the output as a multi-hot encoding vector? Do you have any example around this?

We don't have such an example, but in case of RecordIO you can encode any data as a label (not necessarily a single value), then in DALI, your label would be a (batch size,1,6) tensor. In the case of TFRecord you can encode it in the same way - just return it from the reader in the way as you return the encoded image.