libffcv / ffcv

FFCV: Fast Forward Computer Vision (and other ML workloads!)
https://ffcv.io
Apache License 2.0
2.84k stars 178 forks source link

Insert a transformation once the loader is constructed. #218

Open dskhudia opened 2 years ago

dskhudia commented 2 years ago

Adding a transformation in the pipeline once the loader object is constructed results in the following error.

Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/workdisk/daya/composer/venv/lib/python3.8/site-packages/ffcv/loader/epoch_iterator.py", line 79, in run
    result = self.run_pipeline(b_ix, ixes, slot, events[slot])
  File "/workdisk/daya/composer/venv/lib/python3.8/site-packages/ffcv/loader/epoch_iterator.py", line 133, in run_pipeline
    result = code(*args)
  File "", line 2, in stage_1
  File "/workdisk/daya/composer/venv/lib/python3.8/site-packages/ffcv/transforms/ops.py", line 53, in to_device
    dst = dst[:inp.shape[0]]
TypeError: 'NoneType' object is not subscriptable

Reproduction script:

import ffcv
from dataclasses import replace
from typing import Callable, Optional, Tuple
import torch
from ffcv.fields.decoders import RandomResizedCropRGBImageDecoder
from ffcv.fields.decoders import IntDecoder
import numpy as np

from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from PIL import Image

IMAGENET_CHANNEL_MEAN = (0.485 * 255, 0.456 * 255, 0.406 * 255)
IMAGENET_CHANNEL_STD = (0.229 * 255, 0.224 * 255, 0.225 * 255)

image_pipeline = []

image_pipeline.extend(
    [
        RandomResizedCropRGBImageDecoder((224, 224)),
    ]
)

this_device = torch.device(f"cuda:0")
image_pipeline.extend(
    [
        ffcv.transforms.ToTensor(),
        ffcv.transforms.ToDevice(this_device, non_blocking=True),
        ffcv.transforms.ToTorchImage(),
        ffcv.transforms.NormalizeImage(
            np.array(IMAGENET_CHANNEL_MEAN), np.array(IMAGENET_CHANNEL_STD), np.float16
        ),
    ]
)

label_pipeline = [
    IntDecoder(),
    ffcv.transforms.ToTensor(),
    ffcv.transforms.Squeeze(),
    ffcv.transforms.ToDevice(this_device, non_blocking=True),
]

loader = ffcv.Loader(
    "/tmp/imagenet_train.ffcv",
    batch_size=32,
    num_workers=8,
    order=ffcv.loader.OrderOption.RANDOM,
    distributed=False,
    pipelines={"image": image_pipeline, "label": label_pipeline},
    batches_ahead=2,
    drop_last=True,
)

def add_ffcv_transform(dataloader: ffcv.Loader, transform: Callable, is_tensor_transform: bool = False):
    ops = dataloader.pipelines["image"].operations
    insertion_index = len(ops)
    for i, t in enumerate(ops):
        if isinstance(t, ffcv.transforms.ToTensor):
            insertion_index = i
            break
        if is_tensor_transform:
            insertion_index += 1
    ops.insert(insertion_index, transform)
    # recompile
    dataloader.generate_code()

add_ffcv_transform(loader, ffcv.transforms.RandomHorizontalFlip())

for x in loader:
    print(x[0].shape)
    break
andrewilyas commented 2 years ago

Hi! I think I misunderstood your question over slack---inserting ops into the pipeline after you make the loader isn't supported, as far as I know. @GuillaumeLeclerc can correct me if I'm wrong, but otherwise I'll close this issue since this isn't really on the roadmap---you can always just make two loaders off of the same .ffcv file to achieve the same effect.

dskhudia commented 2 years ago

@andrewilyas Thanks a lot for the explanation. In https://github.com/mosaicml/composer, we have a set of data augmentation algorithms that can be applied flexibly anytime during the training. Dataloader by that time is already constructed so we have to modify the processing pipeline of the dataloader. Recreating (or having another dataloader) is not an option without losing current state.

Please consider adding this (modification of processing pipeline after dataloader construction ) as a feature. I think most of the machinery (For example, recompilation of pipeline) is already there in FFCV.

andrewilyas commented 2 years ago

@dskhudia I see---one option is that you can make the transformations have a flag is_active, and if it's false just skip the transformation. Then during training you can do tx.is_active = True and tx.is_active = False. As long as you make the loader with the recompile=True option, this should do what you want.

dskhudia commented 2 years ago

Thanks. FFCV already respects the is_active flag?

andrewilyas commented 2 years ago

Hi sorry for missing this! FFCV wouldn't see this is_active flag (you could call it anything), it would just be a parameter of the class, and then inside the function returned by generate_code, you would just check is_active and return the original image if is_active was False. Let me know if you need any further help here!

dskhudia commented 2 years ago

@andrewilyas Thanks. No problem. I see how this can work for the transformations I write. I was wondering how to make it work for the existing FFCV transformations.

andrewilyas commented 2 years ago

@dskhudia I see, that makes sense---we'll consider working this into our next release, but for now my advice would be to accomplish what you want by subclassing the augmentation.

dskhudia commented 2 years ago

Thanks @andrewilyas , Eagerly waiting for the next release as it has some of the other improvments as well.