tryolabs / norfair

Lightweight Python library for adding real-time multi-object tracking to any detector.
https://tryolabs.github.io/norfair/
BSD 3-Clause "New" or "Revised" License
2.41k stars 247 forks source link

Speed Estimation of objects initialized and tracked by Norfair #232

Closed ghost closed 1 year ago

ghost commented 1 year ago

Is your feature request related to a problem? Please describe. I want to be able to perform speed estimation along with the tracking of the objects detected by Norfair. Is there a way we can currently go about implementing this with the current version of Norfair library

Describe the solution you'd like Knowing the history of x,y co-ordinates of bounding boxes detected on an image...associated with a particular object registered by Norfair... if this history can be accessed ...it would then be possible to calculate distance covered ( after camera calibration for a particular scene ) .Currently I do not see any way of accessing this history by direct interaction with the objects created by the tracker.update() method

Describe alternatives you've considered I do not know if there is any other way to go about getting the distance covered over a time frame...Please let me know if there is...

Additional context

        import os,cv2
        import numpy as np
        import pandas as pd
        from tqdm.notebook import trange
        from mmdet.apis import init_detector, inference_detector
        import norfair

        # Specify the path to model config and checkpoint file
        config_file = '/workspace/mmdet_train_runs_vehicle_dataset/mmdet_config_1_1x_RTX_A5000_Runpod/custom_config_1.py'
        checkpoint_file = '/workspace/mmdet_train_runs_vehicle_dataset/mmdet_config_1_1x_RTX_A5000_Runpod/latest.pth'
        mmdet_model = init_detector(config_file, checkpoint_file, device='cuda:0')

        labels = {"0":"vehicle","1":"person"}
        input_video_path = '/workspace/vehicle_dataset/CCTV_vehicle_detection.mp4'
        frame_info_df =pd.DataFrame(columns=['frame_no','centroid_detections','object_detections','centroid_tracking','object_tracking'])

        #The parameters set to obtain this video are:----->
        centroid_tracker = norfair.Tracker(distance_function="mean_euclidean", distance_threshold=20,initialization_delay=5)
        bbox_tracker = norfair.Tracker(distance_function="mean_euclidean", distance_threshold=20,initialization_delay=5)
        output_video_path = '/workspace/vehicle_dataset/Inference_CCTV_vehicle_detection.mp4'
        frame_info_df_path = '/workspace/vehicle_dataset/Inference_CCTV_vehicle_detection.pkl'
        frame_info_df_html_path = '/workspace/vehicle_dataset/Inference_CCTV_vehicle_detection.html'

        output_fps = 3
        video_frame_sampling_rate = 5

        cap = cv2.VideoCapture(input_video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        if (cap.isOpened() == False): 
          print("Unable to read video feed")
        frame_width = int(cap.get(3))
        frame_height = int(cap.get(4))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print("FPS of the input video is ---> {}\n".format(fps))
        print("FPS of the output video is ---> {}\n".format(output_fps))
        print("width of videoframe---->{} height of videoframe -----> {}".format(frame_width,frame_height))

        out = cv2.VideoWriter(output_video_path,cv2.VideoWriter_fourcc(*'mp4v'), output_fps, (frame_width,frame_height))

        for fno in trange(0, total_frames, video_frame_sampling_rate,desc="Video Inference",leave=True):
            cap.set(cv2.CAP_PROP_POS_FRAMES, fno)
            ret, frame = cap.read()
            result = inference_detector(mmdet_model, frame)
            centroid_list = list()
            object_bbox_list = list()
            centroid_detections = list()
            object_detections = list()
            for index,category in enumerate(result):
                for item in category:
                    object_bbox = np.array([[item[0],item[1]],[item[2],item[3]]])
                    object_score = np.array([item[4],item[4]])
                    centroid = np.array([float(item[0]+item[2])/2,float(item[1]+item[3])/2])
                    centroid_score = np.array([item[4]])
                    centroid_list.append((labels[str(index)],centroid))
                    object_bbox_list.append((labels[str(index)],object_bbox))
                    centroid_detections.append(norfair.Detection(points = centroid,scores = centroid_score,label = labels[str(index)]))
                    object_detections.append(norfair.Detection(points = object_bbox,scores = object_score,label = labels[str(index)]))
            tracker_1 = centroid_tracker.update(detections=centroid_detections)
            tracker_2 = bbox_tracker.update(detections=object_detections)

            frame_info_dictionary = {'frame_no':fno,'centroid_detections':centroid_list,'object_detections':object_bbox_list,'centroid_tracking':tracker_1,'object_tracking':tracker_2}
            frame_info_df = frame_info_df.append(frame_info_dictionary, ignore_index=True)
            norfair.drawing.draw_tracked_objects(frame=frame,objects=tracker_1,draw_labels=False)
            #norfair.drawing.draw_points(frame=frame, detections=centroid_detections)
            #norfair.drawing.draw_tracked_objects(frame=frame, objects=tracker_2)
            norfair.drawing.draw_boxes(frame=frame,detections=object_detections,draw_labels=True)
            written_text = 'Frame No.---> {}'.format(fno)
            org = (50,50 )
            font = cv2.FONT_HERSHEY_SIMPLEX
            fontScale = 2
            color = (255, 0, 0)
            thickness = 2
            frame = cv2.putText(frame,written_text, org, font,fontScale, color, thickness, cv2.LINE_AA)
            out.write(frame)

        frame_info_df.to_pickle(path=frame_info_df_path)
        with open(frame_info_df_html_path, 'w') as fo:
            fo.write(frame_info_df.to_html())
        cap.release()
        out.release()
        cv2.destroyAllWindows()
DiegoFernandezC commented 1 year ago

Hi @avadhut-00!

Great application using Norfair. These days we discuss with the team different approaches to this problem and reply again.

Is it useful for your application to have the history of past detections of each TrackedObject returned by the update method or do you think of another solution? Currently, this is partially possible, each TrackedObject has an attribute called past_detections, you can adjust the length of this in the tracker instantiation tracker = Tracker(..., past_detections_length=x, ...). The problem with this history for your application is that the detections are distributed equally spaced in time. For this reason, this attribute could be problematic unless your detections history keeps under a certain x number and the past_detections attribute can store all detections.

Let us know if you think of another solution or additional information that will be useful for the internal discussion. Greetings.

DiegoFernandezC commented 1 year ago

Hey @avadhut-00, we opened a PR to give the user the speed estimation for each TrackedObject from the Kalman filter.

You can find more information in the description of this PR, we hope it merges in these days.

Greetings!