libffcv / ffcv

FFCV: Fast Forward Computer Vision (and other ML workloads!)
https://ffcv.io
Apache License 2.0
2.82k stars 178 forks source link

FFCV slows down training considerably #150

Closed NotNANtoN closed 2 years ago

NotNANtoN commented 2 years ago

Hi dear FFCV team,

I guess I am trying something that is not yet fully supported:

  1. I want to train on text and images
  2. I am using pytorch lightning (PL).

But for both problems there seem to be solutions.

  1. Solution: save text in NDarray field ascii encoded and then tokenize before passing the input into the model.
  2. Use some hacky hooks that were posted somewhere (I don't remember where but the file name was custom_PTL_methods.py

So, I am glad that I got FFCV running. For reference, these are my relevant code snippets: Creating the .beton files:

class FFCVCLDataset(torch.utils.data.Dataset):
    def __init__(self, paths, labels, captions, max_len=2000):
        super().__init__()
        self.paths = paths
        self.labels = labels
        self.captions = captions
        self.max_len = max_len

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, i):
        img = np.array(Image.open(self.paths[i]).convert("RGB"))
        labels = np.uint8(self.labels[i])
        text = self.captions[i][-self.max_len:]  # delete first instead of last characters as the last sentences usually contain the finding
        # pad
        padded_text = text + (" " * (self.max_len - len(text)))
        padded_text = np.frombuffer(padded_text.encode("ascii", errors="replace").replace(b"?", b" "), dtype='uint8')
        return img, labels, padded_text

def ffcv_convert_dataset(dataset, write_path):
    os.makedirs("/".join(write_path.split("/")[:-1]), exist_ok=True)

    # Pass a type for each data field
    writer = DatasetWriter(write_path, {
        # Tune options to optimize dataset size, throughput at train-time
        'image': RGBImageField(max_resolution=256, jpeg_quality=90),
        'labels': NDArrayField(np.dtype('uint8'), (len(dataset.labels[0]),)),
        'text': NDArrayField(np.dtype('uint8'), (dataset.max_len,)),
        #'tokens', NDArrayField(np.int32, (77,)),
    },
                          num_workers=0)
    # Write dataset
    writer.from_indexed_dataset(dataset)

The creation of the actual .beton files and of the pipelines (I lef some transforms there that I commented out as it did not work with them included in the pipeline):

self.train_ds = FFCVCLDataset(train_paths, train_labels, train_texts, max_len=max_len)
                    self.val_ds = FFCVCLDataset(val_paths, val_labels, val_texts, max_len=max_len)
                    self.test_ds = FFCVCLDataset(test_paths, test_labels, test_texts, max_len=max_len)
if not os.path.exists(self.ffcv_ds_test):
    # convert them to .beton files 
    ffcv_convert_dataset(self.train_ds, self.ffcv_ds_train)
    ffcv_convert_dataset(self.val_ds, self.ffcv_ds_val)
    ffcv_convert_dataset(self.test_ds, self.ffcv_ds_test)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mean = (0.48145466, 0.4578275, 0.40821073)
std = (0.26862954, 0.26130258, 0.27577711)

# create image pipelines
base_pipeline = [FFCVT.ToTensor(),
                 FFCVT.ToDevice(device, non_blocking=True),
                 FFCVT.ToTorchImage(),
                 FFCVT.Convert(torch.float16),
                 TF.Normalize(mean, std), # Normalize using image statistics
                 #FFCVT.ToDevice(device, non_blocking=True),
]
self.train_image_pipeline = [                
    RandomResizedCropRGBImageDecoder((224, 224)),
    #RandomHorizontalFlip(),
    FFCVT.Cutout(8, tuple(map(lambda x: int(x * 255), mean))),
    FFCVT.RandomTranslate(padding=10, fill=mean),
    #TF.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.0),
] + base_pipeline

self.test_image_pipeline = [
    CenterCropRGBImageDecoder((224, 224), 1),
] + base_pipeline

self.labels_pipeline = [NDArrayDecoder(), FFCVT.ToTensor()]#. FFCVT.ToDevice(device, non_blocking=True),]
self.train_text_pipeline = [NDArrayDecoder(), ]
                            #UintArrToText(), 
                            #SubsampleSents(self.sent_frac), 
                            #Tokenize()]#,FFCVT.ToDevice(device, non_blocking=True),]
self.test_text_pipeline = [NDArrayDecoder(), ]
                           #UintArrToText(), 
                           #Tokenize()]#, FFCVT.ToDevice(device, non_blocking=True),]

This is my dataloader:

ORDERING = OrderOption.QUASI_RANDOM
PIPELINES = {
  'image': self.train_image_pipeline,
  'labels': self.labels_pipeline,
  'text': self.train_text_pipeline,
}
loader = Loader(self.ffcv_ds_train,
    batch_size=self.bs,
    num_workers=16,
    order=ORDERING,
    pipelines=PIPELINES)

The last step where the batch is actually used in train step, along with the relevant transforms:

class UintArrToText(torch.nn.Module):
    def forward(self, x):
        return x.tobytes().decode('ascii').strip()

class Tokenize(torch.nn.Module):
    def forward(self, x):
        return clip.tokenize(x, truncate=True)

class SubsampleSents(torch.nn.Module):
    # samples a random subset of the sentences
    def __init__(self, frac, min_sent_len=5):
        super().__init__()
        self.frac = frac
        self.min_sent_len = min_sent_len

    def forward(self, x):
        sents = [sent.strip() for sent in x.split(".") if len(sent) > self.min_sent_len]
        num_sents_used = round(len(sents) * self.frac)
        sents_used = np.random.choice(sents, num_sents_used, replace=False)
        text = ". ".join(sents_used)
        return text

def training_step(self, batch, batch_idx):
        imgs, labels, tokenized_texts = batch

        if self.use_ffcv:
            from clip_utils import SubsampleSents
            tokenized_texts = torch.cat([Tokenize()(SubsampleSents(self.sent_frac)(UintArrToText()(t))).cuda() for t in tokenized_texts])

        loss, img_loss, text_loss, img_features, text_features, logits_per_image, logits_per_text = self.calc_loss(imgs, tokenized_texts)

        self.log('train_loss', loss, on_epoch=True, on_step=False, prog_bar=True, logger=True)
        lr = self.scheduler.get_last_lr()[0]
        self.log('lr', lr, on_epoch=False, on_step=True, prog_bar=False, logger=True)
        return loss

The results are quite bad, unfortunately. It takes 45 hours with FFCV and without only 28 hours. Proud-sunset is the FFCV run: image

Also, the performance is worse (maybe related due to the ColorJitter missing now?). A higher zero-shot validation accuracy is better: image

So I know that I left some transforms out from the pipeline (like the FFCVT.ToDevice(device, non_blocking=True), which might be important) but it simply did not run with them in there and the error messages were completely cryptic. I also think that having the sentence subsampling and tokenization in the train_step is far from ideal.

I would be happy about any feedback how I could improve this to actually give me a speed-up!

GuillaumeLeclerc commented 2 years ago

Hello,

I see many issues with the code posted above:

But the most problematic thing is class SubsampleSents(torch.nn.Module). It is a pre-processing function written in python (=extremely slow) that is single core (as many times slower as you have number of cores on your machine) and runs in the forward pass of your model. Without profiling your code I can tell that's where at least 80% of the time is lost (and because this is single threaded code I'm sure you are not even using a lot of your CPU resources, let alone your GPU).

NotNANtoN commented 2 years ago

Hi, thanks a lot for the swift response and for looking into the code!

Referring to your points raised:

  1. the .beton file is only generated if it does not exist yet, this should not be a problem -> via if not os.path.exists(path)
  2. PTL is automatically sending the data to the correct device but I think you are right, using the FFCV variant with non_blocking=True should be better. I now changed that.
  3. I actually got that normalization pipeline from the FFCV docs here: https://docs.ffcv.io/ffcv_examples/cifar10.html but now I found the NormalizeImage function and I'm using it now. Side note: It confuses me a bit that NormalizeImage seems to operate on tensors and yet takes numpy arrays as a dtype for mean and std and for the typeargument.
  4. The point about the slow SubsampleSents - I fully agree, the problem is that I had quite some pain in trying to implement it into the pipeline. I now understand that there always needs to be a np.array or tensor in the pipeline, before I did output some text. So I made a new transform that does the 3 previous transforms in one - I had to set the jit_modeto False, otherwise it would not train:

    
    class UintArrToToken(Operation):
    def __init__(self, sent_frac):
        self.sent_frac = sent_frac
    
    # Return the code to run this operation
    def generate_code(self):
        tokenize = Tokenizer()
    
        def uintarr_to_text(x):
            return x.tobytes().decode('ascii').strip()
    
        if self.sent_frac > 0:
            subsample = SubsampleSentsNp(self.sent_frac)
            def uint_to_token(x, dst):
                text = uintarr_to_text(x)
                text = subsample(text)
                tokens = tokenize(text)
                return tokens
        else:
            def uint_to_token(x, dst):
                text = uintarr_to_text(x)
                tokens = tokenize(text)
                return tokens
    
        return uint_to_token
    
    def declare_state_and_memory(self, previous_state):
        new_state = replace(previous_state, shape=(77,), dtype=np.int32, jit_mode=False)
        mem_allocation = AllocationQuery((77,), np.int32)
        return (new_state, mem_allocation)

class SubsampleSentsNp: def init(self, frac, min_sent_len=5): super().init() self.frac = frac self.min_sent_len = min_sent_len

def __call__(self, x):
    sents = [sent.strip() for sent in x.split(".") if len(sent) > self.min_sent_len]
    num_sents_used = round(len(sents) * self.frac)
    sents_used = np.random.choice(sents, num_sents_used, replace=False)
    text = ". ".join(sents_used)
    return text
and my pipelines are now:

base_pipeline = [ FFCVT.ToTensor(), FFCVT.ToDevice(device, non_blocking=True), FFCVT.ToTorchImage(), FFCVT.NormalizeImage(mean, std, np.float16), FFCVT.Convert(torch.float16), FFCVT.ToDevice(device, non_blocking=True),
] self.train_image_pipeline = [
RandomResizedCropRGBImageDecoder((224, 224)),

RandomHorizontalFlip(),

FFCVT.Cutout(8, tuple(map(lambda x: int(x * 255), mean))),
FFCVT.RandomTranslate(padding=10, fill=mean),
#TF.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.0),

] + base_pipeline

self.test_image_pipeline = [ CenterCropRGBImageDecoder((224, 224), 1), ] + base_pipeline

label pipeline

self.labels_pipeline = [NDArrayDecoder(), FFCVT.ToTensor(), FFCVT.ToDevice(device, non_blocking=True),]

text pipeline

self.train_text_pipeline = [NDArrayDecoder(),

UintArrToText(),

                        #SubsampleSents(self.sent_frac), 
                        #Tokenize(),
                        UintArrToToken(self.sent_frac),
                        FFCVT.ToTensor(),
                        FFCVT.ToDevice(device, non_blocking=True),]


The good thing: it is now faster than before! With FFCV it runs at 4.38 it/s and without at 2.26 it/s. So thanks! But the bad things: 1. I think this is not optimal, especially considering that I don't use the `jit`and that I cannot use the `ColorJitter`. 2. Even worse: somehow the labels seem mixed up when I load them - for some classes I now have 0 positive examples in the labels. I'm not sure if you can see how this could happen?
NotNANtoN commented 2 years ago

Btw, when I try to uncomment the TF.ColorJitter in my pipeline I get the following error. Any clues there? Do I need to write a custom FFCV transform even for the torchvision transforms? Should I make a separate issue?:

Exception in thread Thread-1:                                                                                                                              
Traceback (most recent call last):                                                                                                                         
  File "/srv/home/8wiehe/miniconda3/envs/ffcv/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()                                                                                                                                             
  File "/srv/home/8wiehe/miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/loader/epoch_iterator.py", line 79, in run                                  
    result = self.run_pipeline(b_ix, ixes, slot, events[slot])                                                                                             
  File "/srv/home/8wiehe/miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/loader/epoch_iterator.py", line 133, in run_pipeline
    result = code(*args)
  File "/srv/home/8wiehe/miniconda3/envs/ffcv/lib/python3.9/site-packages/numba/core/dispatcher.py", line 468, in _compile_for_args
    error_rewrite(e, 'typing')
  File "/srv/home/8wiehe/miniconda3/envs/ffcv/lib/python3.9/site-packages/numba/core/dispatcher.py", line 409, in error_rewrite
    raise e.with_traceback(None)
numba.core.errors.TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Failed in nopython mode pipeline (step: nopython frontend)
Untyped global name 'self': Cannot determine Numba type of <class 'ffcv.transforms.module.ModuleWrapper'>

File "../miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/transforms/module.py", line 25:
        def apply_module(inp, _):
            res = self.module(inp)
            ^

During: resolving callee type: type(CPUDispatcher(<function ModuleWrapper.generate_code.<locals>.apply_module at 0x7f593b13cd30>))
During: typing of call at  (2)

During: resolving callee type: type(CPUDispatcher(<function ModuleWrapper.generate_code.<locals>.apply_module at 0x7f593b13cd30>))
During: typing of call at  (2)

File "/srv/home/8wiehe/master_thesis", line 2:
<source missing, REPL/exec in use?>
GuillaumeLeclerc commented 2 years ago
  1. I recommend you do your tokenization before creating the Beton this is going to be much faster. Right now you can't use jiting because your tokenization isn't written in numpy. I don't know your pipeline exactly but you can probably do your "subsampling" (whatever it is) on numpy arrays that have already been tokenized no ? About color jitter there has been work towards an FFCV native implementation (see pull requests). Hopefully it lands in v 1.0.
  2. Can you elaborate on that in a new issue that sounds like a bug
  3. Can you create a new issue with the complete pipeline ?
NotNANtoN commented 2 years ago

So there is one thing I don't understand about FFCV and dataloaders in general: how can it be a bottleneck? If we use 16 workers for a batch size of 16, then each worker only needs to load one image, apply some minor transforms on it, synchronize them in a batch and send the batch to the GPU. So it can only be a bottleneck if these operations take longer than the forward+backpass, right?

Anyways, I think I'm giving up. I tried working with the text, tokenized in the beton file, but in the end my speed-up is minor and the training runs, but the results are gibberish and my labels mixed up. Which sucks because I spent quite some days trying to get this to work. Thanks anyways for your feedback!

GuillaumeLeclerc commented 2 years ago

Many reasons why it is a bottleneck:

Basically in this situation you are limiting yourself to a single worker + run python code instead which is orders of magnitude slower than compiled code.

Hope it helps clarifying!

GuillaumeLeclerc commented 2 years ago

Closing due to inactivity