crj1998 / simplified_dfl

0 stars 0 forks source link

face tracking #2

Open crj1998 opened 1 month ago

crj1998 commented 1 month ago

Reference

import cv2
import argparse
import numpy as np
from tqdm import tqdm
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment

from facex.detection import Detector

def iou(bb_test, bb_gt):
    """
    Computes IUO between two bboxes in the form [x1,y1,x2,y2]
    """
    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[2]-bb_test[0])*(bb_test[3]-bb_test[1]) + (bb_gt[2]-bb_gt[0])*(bb_gt[3]-bb_gt[1]) - wh)
    return o

def linear_assignment(cost_matrix):
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))

def associate_detections_to_trackers(detections, trackers, iou_threshold = 0.3):
    """(numpy.array, numpy.array, int) -> numpy.array, numpy.array, numpy.array

    Returns 3 lists of matches, unmatched_detections and unmatched_trackers
    """

    if(len(trackers)==0):
        return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,4),dtype=int)

    iou_matrix = np.zeros((len(detections),len(trackers)),dtype=np.float32)

    for d, det in enumerate(detections):
        for t,trk in enumerate(trackers):
            iou_matrix[d,t] = iou(det,trk)

    matched_indices = linear_assignment(-iou_matrix)

    unmatched_detections = []
    for d, det in enumerate(detections):
        if(d not in matched_indices[:, 0]):
            unmatched_detections.append(d)

    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if(t not in matched_indices[:, 1]):
            unmatched_trackers.append(t)

    matches = []
    for m in matched_indices:
        if iou_matrix[m[0],m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1,2))

    if len(matches)==0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

class KalmanTracker(object):
    counter = 1
    def __init__(self, dets):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
        self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])
        self.kf.R[2:,2:] *= 10.
        self.kf.P[4:,4:] *= 1000.
        self.kf.P *= 10.
        self.kf.Q[-1,-1] *= 0.01
        self.kf.Q[4:,4:] *= 0.01
        self.kf.x[:4] = np.array([dets[0], dets[1], dets[2], dets[3]]).reshape((4, 1))
        self.id = KalmanTracker.counter
        KalmanTracker.counter += 1

    def __call__(self):
        if self.kf.x[6]+self.kf.x[2] <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        return self.kf.x

    def correction(self, measurement):
        self.kf.update(measurement)

    def get_current_x(self):
        bbox = (np.array([self.kf.x[0], self.kf.x[1], self.kf.x[2], self.kf.x[3]]).reshape((1, 4)))
        return bbox

class FaceTracker(object):
    def __init__(self):
        self.current_trackers = []

    def __call__(self, detections):
        retain_trackers = []

        if len(self.current_trackers) == 0:
            self.current_trackers = []
            for d in range(len(detections)):
                tracker = KalmanTracker(detections[d, :4])
                measurement = np.array([[int(detections[d, 0])], [int(detections[d, 1])], [int(detections[d, 2])], [int(detections[d, 3])]], np.float32)
                tracker.correction(measurement)
                self.current_trackers.append(tracker)

            for trk in self.current_trackers:
                d = trk.get_current_x()
                retain_trackers.append(np.concatenate((d[0], [trk.id])).reshape(1,-1))

            if len(retain_trackers) > 0:
                return np.concatenate(retain_trackers)

            return np.empty((0, 5))

        else:
            predicted_trackers = []
            for t in range(len(self.current_trackers)):
                predictions = self.current_trackers[t]()[:4]
                predicted_trackers.append(predictions)

            predicted_trackers = np.asarray(predicted_trackers)

            matched, unmatched_detections, unmatched_trackers = associate_detections_to_trackers(detections[:, :4], predicted_trackers)

            # print ('Matched Detections & Trackers', len(matched))
            # print ('Unmatched Detections', len(unmatched_detections))
            # print ('Unmatched Trackers', len(unmatched_trackers))
            # print ('Current Trackers', len(self.current_trackers))

            for t in range(len(self.current_trackers)):
                if(t not in unmatched_trackers):
                    d = matched[np.where(matched[:,1]==t)[0], 0]
                    self.current_trackers[t].correction(np.array([detections[d, 0], detections[d, 1], 
                    detections[d, 2], detections[d, 3]]).reshape((4, 1)))

            for i in unmatched_detections:
                tracker = KalmanTracker(detections[i, :4])
                measurement = np.array([[int(detections[i, 0])], [int(detections[i, 1])], [int(detections[i, 2])],
                                        [int(detections[i, 3])]], np.float32)
                tracker.correction(measurement)
                self.current_trackers.append(tracker)

            for index in sorted(unmatched_trackers, reverse=True):
                del self.current_trackers[index]

            for trk in self.current_trackers:
                d = trk.get_current_x()
                retain_trackers.append(np.concatenate((d[0], [trk.id])).reshape(1,-1))

        if len(retain_trackers) > 0:
            return np.concatenate(retain_trackers)

        return np.empty((0, 5))

def read_detect_track_faces(videopath, facedetector):
    facetracker = FaceTracker()

    videocapture = cv2.VideoCapture(videopath)

    fps = int(videocapture.get(cv2.CAP_PROP_FPS))
    resolution = (int(videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(videocapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    num_frames = int(videocapture.get(cv2.CAP_PROP_FRAME_COUNT))
    print("fps:", fps)
    print("resolution:", resolution)
    print("num frames:", num_frames)
    colours = np.random.rand(32, 3)
    colours = np.clip(colours*255, 0, 255).astype(int)
    success = True
    frame_number = 0
    frames = []
    pbar = tqdm(total=600, ncols=100)
    while success:
        success, frame = videocapture.read()
        if not success:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        faces, _ = facedetector(frame)
        faces = faces[0].cpu().numpy()

        trackers = facetracker(faces)

        for tracker in trackers:
            tracker = tracker.astype(np.int32)
            cv2.rectangle(frame, tracker[:2], tracker[2:4], color=colours[tracker[4]%32].tolist(), thickness=2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        frames.append(frame)
        frame_number += 1
        pbar.update(1)

    pbar.close()
    fourcc = "XVID"
    writer = cv2.VideoWriter("/root/track.avi", cv2.VideoWriter_fourcc(*fourcc), fps, resolution)
    for frame in tqdm(frames, ncols=100, desc="Write"):            
        writer.write(frame)
    writer.release()

def parse_args():
    parser = argparse.ArgumentParser(description='Tracking Arguments')
    parser.add_argument('--videopath', help='Input Video Path')
    args = parser.parse_args()
    return args

if __name__ == "__main__":
    args = parse_args()
    videopath = args.videopath

    # facedetector = FaceDetector(detector_name, detector_params)
    facedetector = Detector()
    read_detect_track_faces(videopath, facedetector)
crj1998 commented 1 month ago

Reference:

import cv2
from argparse import ArgumentParser
from tqdm import tqdm
from facex.detection import Detector

facedetector = Detector()

def IoU(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

def join(tube_bbox, bbox):
    xA = min(tube_bbox[0], bbox[0])
    yA = min(tube_bbox[1], bbox[1])
    xB = max(tube_bbox[2], bbox[2])
    yB = max(tube_bbox[3], bbox[3])
    return (xA, yA, xB, yB)

def compute_bbox(start, end, fps, tube_bbox, frame_shape, inp, image_shape, increase_area=0.4):
    left, top, right, bot = tube_bbox
    width = right - left
    height = bot - top

    #Computing aspect preserving bbox
    width_increase = max(increase_area, ((1 + 2 * increase_area) * height - width) / (2 * width))
    height_increase = max(increase_area, ((1 + 2 * increase_area) * width - height) / (2 * height))

    left = int(left - width_increase * width)
    top = int(top - height_increase * height)
    right = int(right + width_increase * width)
    bot = int(bot + height_increase * height)

    top, bot, left, right = max(0, top), min(bot, frame_shape[0]), max(0, left), min(right, frame_shape[1])
    h, w = bot - top, right - left

    start = start / fps
    end = end / fps
    time = end - start

    scale = f'{image_shape[0]}:{image_shape[1]}'

    return f'ffmpeg -i {inp} -ss {start} -t {time} -filter:v "crop={w}:{h}:{left}:{top}, scale={scale}" crop.mp4'

def compute_bbox_trajectories(trajectories, fps, frame_shape, args):
    commands = []
    for bbox, tube_bbox, start, end in trajectories:
        if (end - start) > args.min_frames:
            command = compute_bbox(start, end, fps, tube_bbox, frame_shape, inp=args.inp, image_shape=args.image_shape, increase_area=args.increase)
            commands.append(command)
    return commands

def process_video(args):
    videocapture = cv2.VideoCapture(args.inp)

    fps = int(videocapture.get(cv2.CAP_PROP_FPS))
    resolution = (int(videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(videocapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    num_frames = int(videocapture.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_shape = resolution[::-1] + (3, )
    print("fps:", fps)
    print("resolution:", resolution)
    print("num frames:", num_frames)
    trajectories = []

    commands = []
    i = 0
    success  =True
    intersection = 0
    while success:
        success, frame = videocapture.read()
        if not success:
            break
        faces, _ = facedetector(frame)
        bboxes = faces[0].cpu().numpy()

        ## For each trajectory check the criterion
        not_valid_trajectories = []
        valid_trajectories = []

        for trajectory in trajectories:
            tube_bbox = trajectory[0]
            intersection = 0
            for bbox in bboxes:
                intersection = max(intersection, IoU(tube_bbox, bbox))
            if intersection > args.iou_with_initial:
                valid_trajectories.append(trajectory)
            else:
                not_valid_trajectories.append(trajectory)

        commands += compute_bbox_trajectories(not_valid_trajectories, fps, frame_shape, args)
        trajectories = valid_trajectories

        ## Assign bbox to trajectories, create new trajectories
        for bbox in bboxes:
            intersection = 0
            current_trajectory = None
            for trajectory in trajectories:
                tube_bbox = trajectory[0]
                current_intersection = IoU(tube_bbox, bbox)
                if intersection < current_intersection and current_intersection > args.iou_with_initial:
                    intersection = IoU(tube_bbox, bbox)
                    current_trajectory = trajectory

            ## Create new trajectory
            if current_trajectory is None:
                trajectories.append([bbox, bbox, i, i])
            else:
                current_trajectory[1] = join(current_trajectory[1], bbox)
                current_trajectory[3] = i
        print(trajectories)

        i += 1

    commands += compute_bbox_trajectories(trajectories, fps, frame_shape, args)
    return commands

if __name__ == "__main__":
    parser = ArgumentParser()
    parser.add_argument("--inp", required=True, help='Input image or video')

    parser.add_argument("--image_shape", default=(512, 512), type=lambda x: tuple(map(int, x.split(','))), help="Image shape")
    parser.add_argument("--increase", default=0.1, type=float, help='Increase bbox by this amount')
    parser.add_argument("--iou_with_initial", type=float, default=0.25, help="The minimal allowed iou with inital bbox")
    parser.add_argument("--min_frames", type=int, default=50,  help='Minimum number of frames')

    args = parser.parse_args()

    commands = process_video(args)
    for command in commands:
        print (command)