sucv / ABAW3

We achieved the 2nd and 3rd places in ABAW3 and ABAW5, respectively.
18 stars 4 forks source link

`facial_image_crop_by_landmark` is missing from preprocessing code #12

Open sbelharbi opened 6 days ago

sbelharbi commented 6 days ago

hi, thanks for this code. very helpful. the preprocessing code you provided in a link https://drive.google.com/file/d/1_5HkqdQrf388JJvLAH1B_d7ctZLWy1KE/view?usp=drive_link is missing facial_image_crop_by_landmark. it is called in abaw5_preprocessing/base/preprocessing.py:

landmark_handler = facial_image_crop_by_landmark(**config_landmark)

in the function crop_align_face_fn. earlier, the import of this function is commented:

# from base.facial_landmark import facial_image_crop_by_landmark

i would like to use to crop and align the faces for a different dataset. can you please provide base.facial_landmark module or point to where you got this preprocessing code.

thanks

sbelharbi commented 6 days ago

this function is also missing from base.dataset import preprocess_video_dataset. can you please also provide the module base.dataset.

thanks

sbelharbi commented 6 days ago

there is an import of something that does not exist: import face_alignment in base.preprocessing.py it would be helpful if you can provide it.

thanks

sucv commented 1 day ago

hi, thanks for this code. very helpful. the preprocessing code you provided in a link https://drive.google.com/file/d/1_5HkqdQrf388JJvLAH1B_d7ctZLWy1KE/view?usp=drive_link is missing facial_image_crop_by_landmark. it is called in abaw5_preprocessing/base/preprocessing.py:

landmark_handler = facial_image_crop_by_landmark(**config_landmark)

in the function crop_align_face_fn. earlier, the import of this function is commented:

# from base.facial_landmark import facial_image_crop_by_landmark

i would like to use to crop and align the faces for a different dataset. can you please provide base.facial_landmark module or point to where you got this preprocessing code.

thanks

import numpy as np
import cv2

def facial_landmark_template():

    template = np.float32([
        (0.0792396913815, 0.339223741112), (0.0829219487236, 0.456955367943),
        (0.0967927109165, 0.575648016728), (0.122141515615, 0.691921601066),
        (0.168687863544, 0.800341263616), (0.239789390707, 0.895732504778),
        (0.325662452515, 0.977068762493), (0.422318282013, 1.04329000149),
        (0.531777802068, 1.06080371126), (0.641296298053, 1.03981924107),
        (0.738105872266, 0.972268833998), (0.824444363295, 0.889624082279),
        (0.894792677532, 0.792494155836), (0.939395486253, 0.681546643421),
        (0.96111933829, 0.562238253072), (0.970579841181, 0.441758925744),
        (0.971193274221, 0.322118743967), (0.163846223133, 0.249151738053),
        (0.21780354657, 0.204255863861), (0.291299351124, 0.192367318323),
        (0.367460241458, 0.203582210627), (0.4392945113, 0.233135599851),
        (0.586445962425, 0.228141644834), (0.660152671635, 0.195923841854),
        (0.737466449096, 0.182360984545), (0.813236546239, 0.192828009114),
        (0.8707571886, 0.235293377042), (0.51534533827, 0.31863546193),
        (0.516221448289, 0.396200446263), (0.517118861835, 0.473797687758),
        (0.51816430343, 0.553157797772), (0.433701156035, 0.604054457668),
        (0.475501237769, 0.62076344024), (0.520712933176, 0.634268222208),
        (0.565874114041, 0.618796581487), (0.607054002672, 0.60157671656),
        (0.252418718401, 0.331052263829), (0.298663015648, 0.302646354002),
        (0.355749724218, 0.303020650651), (0.403718978315, 0.33867711083),
        (0.352507175597, 0.349987615384), (0.296791759886, 0.350478978225),
        (0.631326076346, 0.334136672344), (0.679073381078, 0.29645404267),
        (0.73597236153, 0.294721285802), (0.782865376271, 0.321305281656),
        (0.740312274764, 0.341849376713), (0.68499850091, 0.343734332172),
        (0.353167761422, 0.746189164237), (0.414587777921, 0.719053835073),
        (0.477677654595, 0.706835892494), (0.522732900812, 0.717092275768),
        (0.569832064287, 0.705414478982), (0.635195811927, 0.71565572516),
        (0.69951672331, 0.739419187253), (0.639447159575, 0.805236879972),
        (0.576410514055, 0.835436670169), (0.525398405766, 0.841706377792),
        (0.47641545769, 0.837505914975), (0.41379548902, 0.810045601727),
        (0.380084785646, 0.749979603086), (0.477955996282, 0.74513234612),
        (0.523389793327, 0.748924302636), (0.571057789237, 0.74332894691),
        (0.672409137852, 0.744177032192), (0.572539621444, 0.776609286626),
        (0.5240106503, 0.783370783245), (0.477561227414, 0.778476346951)])

    template_min, template_max = np.min(template, axis=0), np.max(template, axis=0)
    template = (template - template_min) / (template_max - template_min)

    # Indices of inner eyes and bottom lip.
    key_indices = [39, 42, 57]

    # Indices of the outline.
    outline_indices = [*range(17), *range(26, 16, -1)]
    return template, key_indices, outline_indices

class facial_image_crop_by_landmark(object):
    def __init__(self, **config):
        self.dummy = np.ones((config["landmark_number"], 1), dtype=np.float32)
        self.template, self.template_key_indices, self.template_outline_indices = facial_landmark_template()
        self.landmark_number = config['landmark_number']
        self.output_image_size = config['output_image_size']

    def crop_image(self, image, landmark):
        affine_matrix = self.get_affine_matrix(landmark)
        # aligned_landmark = self.align_landmark(landmark, affine_matrix)
        aligned_image = self.align_image(image, affine_matrix)
        return aligned_image

    def align_image(self, image, affine_matrix):
        r'''
        Warp the frame by the defined affine transformation.
        :param frame: (uint8 ndarray), the frame to warp.
        :param affine_matrix: (float ndarray), the affine matrix.
        :return: (uint8 ndarray), the aligned frame.
        '''
        aligned_image = cv2.warpAffine(image, affine_matrix,
                                       (self.output_image_size,
                                        self.output_image_size))
        return aligned_image

    def align_landmark(self, landmark, affine_matrix):
        r"""
        Warp the landmark by the defined affine transformation.
        :param landmark: (float ndarray), the landmark to warp.
        :param affine_matrix: (float ndarray), the affine matrix.
        :return: (float ndarray), the aligned landmarks.
        """
        aligned_landmark = np.c_[landmark, self.dummy].dot(affine_matrix.T)
        return aligned_landmark

    def get_affine_matrix(self, landmark):
        r"""
        Calculate the affine matrix from the source to the target coordinates.
            Here, the template_key_indices defines which points to select.
        :param landmark: (float ndarray), the landmark to align.
        :return: (float ndarray), the 2x3 affine matrix.
        """
        source = np.asarray(landmark[self.template_key_indices], dtype=np.float32)
        target = np.asarray(self.template[self.template_key_indices] * self.output_image_size, dtype=np.float32)
        affine_matrix = cv2.getAffineTransform(source, target)
        return affine_matrix
sucv commented 1 day ago

this function is also missing from base.dataset import preprocess_video_dataset. can you please also provide the module base.dataset.

thanks

class preprocess_video_dataset(Dataset):
    def __init__(self, video, config):
        self.transform = transforms.Compose([
            NumpyToPilImage(),
            transforms.Resize(config["image_size"]),
            transforms.CenterCrop(config["crop_size"]),
            transforms.ToTensor(),
            transforms.Normalize(config["mean"], config["std"])
        ])
        # NCHW
        self.data_list = video

    def __getitem__(self, idx):
        image = self.data_list[idx]
        image = self.transform(image)
        return image

    def __len__(self):
        return len(self.data_list)

class PILImageDataset(Dataset):
    def __init__(self, pil_images, transform=None):
        self.pil_images = pil_images
        self.transform = transform

    def __len__(self):
        return len(self.pil_images)

    def __getitem__(self, idx):
        image = self.pil_images[idx]
        if self.transform:
            image = self.transform(image)
        return image

there is an import of something that does not exist: import face_alignment in base.preprocessing.py it would be helpful if you can provide it.

thanks

from torch.utils.data import Dataset
from torchvision.transforms import transforms
from base.transforms3D import *
from base.utils import load_npy

import os
from collections import OrderedDict
from operator import itemgetter

class preprocess_video_dataset(Dataset):
    def __init__(self, video, config):
        self.transform = transforms.Compose([
            NumpyToPilImage(),
            transforms.Resize(config["image_size"]),
            transforms.CenterCrop(config["crop_size"]),
            transforms.ToTensor(),
            transforms.Normalize(config["mean"], config["std"])
        ])
        # NCHW
        self.data_list = video

    def __getitem__(self, idx):
        image = self.data_list[idx]
        image = self.transform(image)
        return image

    def __len__(self):
        return len(self.data_list)

class PILImageDataset(Dataset):
    def __init__(self, pil_images, transform=None):
        self.pil_images = pil_images
        self.transform = transform

    def __len__(self):
        return len(self.pil_images)

    def __getitem__(self, idx):
        image = self.pil_images[idx]
        if self.transform:
            image = self.transform(image)
        return image

class GenericDataArranger(object):
    def __init__(self, dataset_info, dataset_path, debug, repeat=0):
        self.dataset_info = dataset_info
        self.debug = debug
        self.repeat = repeat
        self.trial_list = self.generate_raw_trial_list(dataset_path)
        self.partition_range = self.partition_range_fn()
        self.fold_to_partition = self.assign_fold_to_partition()

    def generate_iterator(self):
        iterator = self.dataset_info['partition']
        return iterator

    def generate_partitioned_trial_list(self, window_length, hop_length, fold, windowing=True):

        train_validate_range = self.partition_range['train'] + self.partition_range['validate']
        assert len(train_validate_range) == self.fold_to_partition['train'] + self.fold_to_partition['validate']

        partition_range = list(np.roll(train_validate_range, fold))
        partition_range += self.partition_range['test'] + self.partition_range['extra']
        partitioned_trial = {}

        for partition, num_fold in self.fold_to_partition.items():
            partitioned_trial[partition] = []

            for i in range(num_fold):
                index = partition_range.pop(0)
                trial_of_this_fold = list(itemgetter(*index)(self.trial_list))

                if len(index) == 1:
                    trial_of_this_fold = [trial_of_this_fold]

                for path, trial, length in trial_of_this_fold:
                    if not windowing:
                        window_length = length

                    windowed_indices = self.windowing(np.arange(length), window_length=window_length,
                                                      hop_length=hop_length, partition=partition)

                    for index in windowed_indices:
                        partitioned_trial[partition].append([path, trial, length, index])

        return partitioned_trial

    def calculate_mean_std(self, partitioned_trial):
        feature_list = self.get_feature_list()
        mean_std_dict = {partition: {feature: {'mean': None, 'std': None} for feature in feature_list} for partition in partitioned_trial.keys()}

        # Calculate the mean
        for feature in feature_list:
            for partition, trial_of_a_partition in partitioned_trial.items():
                lengths = 0
                sums = 0
                for path, _, _, _ in trial_of_a_partition:
                    data = load_npy(path, feature)
                    data = data.flatten()
                    lengths += len(data)
                    sums += data.sum()
                mean_std_dict[partition][feature]['mean'] = sums / (lengths + 1e-10)

        # Then calculate the standard deviation.
        for feature in feature_list:
            for partition, trial_of_a_partition in partitioned_trial.items():
                lengths = 0
                x_minus_mean_square = 0
                mean = mean_std_dict[partition][feature]['mean']
                for path, _, _, _ in trial_of_a_partition:
                    data = load_npy(path, feature)
                    data = data.flatten()
                    lengths += len(data)
                    x_minus_mean_square += np.sum((data - mean) ** 2)
                x_minus_mean_square_divide_N_minus_1 = x_minus_mean_square / (lengths - 1)
                mean_std_dict[partition][feature]['std'] = np.sqrt(x_minus_mean_square_divide_N_minus_1)

        return mean_std_dict

    @staticmethod
    def partition_range_fn():
        raise NotImplementedError

    @staticmethod
    def assign_fold_to_partition():
        raise NotImplementedError

    @staticmethod
    def get_feature_list():
        feature_list = ['landmark', 'action_unit', 'mfcc', 'egemaps', 'vggish']
        return feature_list

    def generate_raw_trial_list(self, dataset_path):
        trial_path = os.path.join(dataset_path, self.dataset_info['data_folder'])

        trial_dict = OrderedDict({'train': [], 'validate': [], 'extra': [], 'test': []})
        for idx, partition in enumerate(self.generate_iterator()):

            if partition == "unused":
                continue

            if partition == "valid":
                partition = "validate"

            trial = self.dataset_info['trial'][idx]
            path = os.path.join(trial_path, str(trial))
            length = self.dataset_info['length'][idx]

            trial_dict[partition].append([path, trial, length])

        trial_list = []
        for partition, trials in trial_dict.items():
            trial_list.extend(trials)

        return trial_list

    def windowing(self, x, window_length, hop_length, partition):
        length = len(x)

        if length >= window_length:
            steps = (length - window_length) // hop_length + 1

            sampled_x = []
            for i in range(steps):
                start = i * hop_length
                end = start + window_length
                sampled_x.append(x[start:end])
                # To make full use of the test set, load each sample within a segment.
                if partition == "test" or partition == "extra":
                    [sampled_x.append(x[start:end]) for _ in range(self.repeat - 1)]

            if sampled_x[-1][-1] < length - 1:
                sampled_x.append(x[-window_length:])
                # To make full use of the test set, load each sample within a segment.
                if partition == "test" or partition == "extra":
                    [sampled_x.append(x[-window_length:]) for _ in range(self.repeat - 1)]
        else:
            sampled_x = [x]

        return sampled_x

class GenericDataset(Dataset):
    def __init__(self, data_list, continuous_label_dim, modality, multiplier, feature_dimension, window_length, mode, mean_std=None,
                 time_delay=0, load_whole_trial=0, repeat_on_testset=1):
        self.data_list = data_list
        self.continuous_label_dim = continuous_label_dim
        self.mean_std = mean_std
        self.time_delay = time_delay
        self.modality = modality
        self.multiplier = multiplier
        self.feature_dimension = feature_dimension
        self.load_whole_trial = load_whole_trial
        self.window_length = window_length
        self.mode = mode
        self.idx_for_test_sampling = 0
        self.repeat_on_testset = repeat_on_testset
        self.transform_dict = {}
        self.get_3D_transforms()

    def get_index_given_emotion(self):
        raise NotImplementedError

    def get_3D_transforms(self):
        normalize = GroupNormalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])

        if "video" in self.modality:
            if self.mode == 'train':
                self.transform_dict['video'] = transforms.Compose([
                    GroupNumpyToPILImage(0),
                    GroupScale(48),
                    GroupRandomCrop(48, 40),
                    GroupRandomHorizontalFlip(),
                    Stack(),
                    ToTorchFormatTensor(),
                    normalize
                ])
            else:
                self.transform_dict['video'] = transforms.Compose([
                    GroupNumpyToPILImage(0),
                    GroupScale(48),
                    GroupCenterCrop(40),
                    Stack(),
                    ToTorchFormatTensor(),
                    normalize
                ])

        for feature in self.modality:
            if "continuous_label" not in feature and "video" not in feature:
                self.transform_dict[feature] = self.get_feature_transform(feature)

    def get_feature_transform(self, feature):
        if "cnn" in feature or "backbone" in feature or "eeg_bandpower" in feature or "vggface" in feature or "vggish" in feature or "eeg_bandpower" in feature:
            transform = transforms.Compose([
                transforms.ToTensor()
            ])
        else:
            transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=[self.mean_std[feature]['mean']],
                                     std=[self.mean_std[feature]['std']])
            ])
        return transform

    def __getitem__(self, index):
        path, trial, length, index = self.data_list[index]
        examples = {}

        x = self.idx_for_test_sampling
        if self.mode == "train" or self.mode == "validate":
            x = None

        for feature in self.modality:
            examples[feature] = self.get_example(path, length, index, feature, x)
        self.idx_for_test_sampling += 1
        if self.idx_for_test_sampling == self.repeat_on_testset:
            self.idx_for_test_sampling = 0

        if len(index) < self.window_length:
            index = np.arange(self.window_length)

        return examples, str(trial), length, index

    def __len__(self):
        return len(self.data_list)

    def get_example(self, path, length, index, feature, x=None):

        if "cnn" in feature:
            multiplier = self.multiplier['cnn']
            feature_dimension = self.feature_dimension['cnn']
        else:
            multiplier = self.multiplier[feature]
            feature_dimension = self.feature_dimension[feature]
            if "label" in feature and len(self.continuous_label_dim) == 1:
                feature_dimension = (1,)

        if x is None:
            x = random.randint(0, multiplier - 1)

        if "continuous_label" in feature:
            x = 0

        random_index = index * multiplier + x

        # Probably, a trial may be shorter than the window, so the zero padding is employed.
        if length < self.window_length:
            shape = (self.window_length,) + feature_dimension
            dtype = np.float32
            if feature == "video":
                dtype = np.int8
            example = np.zeros(shape=shape, dtype=dtype)
            example[index] = self.load_data(path, random_index, feature, feature_dimension)
        else:
            example = self.load_data(path, random_index, feature, feature_dimension)

        # Sometimes we may want to shift the label, so that
        # the ith label point  corresponds to the (i - time_delay)-th data point.
        if "continuous_label" in feature and self.time_delay != 0:
            example = np.concatenate(
                (example[self.time_delay:, :],
                 np.repeat(example[-1, :][np.newaxis], repeats=self.time_delay, axis=0)), axis=0)

        if "continuous_label" not in feature:
            example = self.transform_dict[feature](np.asarray(example, dtype=np.float32))

        return example

    def load_data(self, path, indices, feature, feature_dimension):
        filename = os.path.join(path, feature + ".npy")

        # For the test set, labels of zeros are generated as dummies.
        data = np.zeros(((len(indices),) + feature_dimension), dtype=np.float32)

        if os.path.isfile(filename):
            if self.load_whole_trial:
                data = np.load(filename, mmap_mode='c')
            else:
                data = np.load(filename, mmap_mode='c')[indices]

            if "continuous_label" in feature:
                data = self.processing_label(data)

        return data

    def processing_label(self, label):
        label = label[:, self.continuous_label_dim]
        if label.ndim == 1:
            label = label[:, None]
        return label
sucv commented 1 day ago

import face_alignment

pip install face-alignment