roboflow / supervision

We write your reusable computer vision tools. 💜
https://supervision.roboflow.com
MIT License
18.53k stars 1.44k forks source link

Tracker IDs Skipping #1355

Open likith1908 opened 1 month ago

likith1908 commented 1 month ago

Search before asking

Question

import argparse
import supervision as sv
import cv2
from ultralytics import YOLO
import numpy as np
from collections import defaultdict, deque

SOURCE = np.array([[0, 0], [3072, 0], [3072, 1080], [0, 1080]])

TARGET_WIDTH = 0.7388
TARGET_HEIGHT = 0.2594

TARGET = np.array([
    [0, 0],
    [TARGET_WIDTH, 0],
    [TARGET_WIDTH, TARGET_HEIGHT],
    [0, TARGET_HEIGHT]
])

class ViewTransformer:
    def __init__(self, source=np.ndarray, target=np.ndarray):
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            print("Warning: No points to transform.")
            return np.array([])
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

def parse_arguments() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Speed Estimation using Ultralytics and Supervision"
    )
    parser.add_argument(
        "--source_video_path",
        required=False,
        default="/input.mp4",
        help="Path to the source video file",
        type=str``
    )
    return parser.parse_args()

def main():
    args = parse_arguments()
    print(f"Source video path: {args.source_video_path}")

    video_info = sv.VideoInfo.from_video_path(args.source_video_path)
    print(f"Video info: {video_info}")

    model = YOLO("model.pt")
    print("YOLO model loaded.")

    byte_track = sv.ByteTrack(frame_rate=video_info.fps)
    print("ByteTrack initialized.")

    thickness = sv.calculate_optimal_line_thickness(resolution_wh=video_info.resolution_wh)
    text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)
    bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=thickness, color_lookup=sv.ColorLookup.TRACK)
    label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position= sv.Position.BOTTOM_CENTER, color_lookup=sv.ColorLookup.TRACK)
    trace_annotator = sv.TraceAnnotator(thickness=thickness, trace_length= video_info.fps * 2 , position= sv.Position.BOTTOM_CENTER, color_lookup= sv.ColorLookup.TRACK)
    frame_generator = sv.get_video_frames_generator(args.source_video_path)
    polygon_zone = sv.PolygonZone(SOURCE)

    # box_annotator = sv.BoxAnnotator(thickness=4, text_thickness=4, text_scale=2)
    # box_annotator = sv.BoundingBoxAnnotator(thickness=thickness)
    zone_annotator = sv.PolygonZoneAnnotator(zone=polygon_zone, color=sv.Color.WHITE, thickness=6, text_thickness=6, text_scale=4)

    view_transformer = ViewTransformer(SOURCE, TARGET)
    coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))

    frame_count = 0
    with sv.VideoSink(target_path='output.mp4', video_info=video_info) as sink:

        for frame in frame_generator:

            try:
                frame_count += 1
                print(f"Processing frame {frame_count}/{video_info.total_frames}")

                # Ensure the frame is valid
                if frame is None:
                    print(f"Frame {frame_count} is None, skipping.")
                    continue

                result = model(frame)
                print("Frame processed by model.")

                if not result:
                    print(f"No result for frame {frame_count}, skipping.")
                    continue

                detections = sv.Detections.from_ultralytics(result[0])
                print(f"Detections: {detections}")

                detections = detections[polygon_zone.trigger(detections)]
                detections = byte_track.update_with_detections(detections=detections)

                points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                if points.size > 0:
                    points = view_transformer.transform_points(points=points)
                else:
                    print("No points detected in the frame.")

                labels = []
                for tracker_id, [_, y] in zip(detections.tracker_id, points):
                    coordinates[tracker_id].append(y)
                    if len(coordinates[tracker_id]) < video_info.fps / 2:
                        labels.append(f"#{tracker_id}")
                    else:
                        coordinates_start = coordinates[tracker_id][-1]
                        coordinates_stop = coordinates[tracker_id][0]
                        distance = abs(coordinates_start - coordinates_stop)
                        time = len(coordinates[tracker_id]) / video_info.fps
                        speed = (distance / time) * 3.6

                        print(f"Tracker ID: {tracker_id}")
                        print(f"Coordinates Start: {coordinates_start}")
                        print(f"Coordinates Stop: {coordinates_stop}")
                        print(f"Distance: {distance}")
                        print(f"Time: {time}")
                        print(f"Speed: {speed} km/h")

                        labels.append(f"#{tracker_id}, {float(speed)} kmph")

                annotated_frame = frame.copy()
                annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
                annotated_frame = bounding_box_annotator.annotate(scene=annotated_frame, detections=detections)
                annotated_frame = sv.draw_polygon(annotated_frame, polygon=SOURCE, color=sv.Color.RED)
                annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)
                annotated_frame = zone_annotator.annotate(scene=annotated_frame)            
                # cv2.namedWindow("Annotated Frame", cv2.WINDOW_NORMAL)
                # cv2.imshow("Annotated Frame", annotated_frame)
                sink.write_frame(frame=annotated_frame)
                if cv2.waitKey(1) == ord("q"):
                    break
            except Exception as e:
                print(f"Error processing frame {frame_count}: {e}")

        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Can provide you model.pt if you like to reproduce.

link to video : https://www.youtube.com/watch?v=uWP6UjDeZvY OR click on timestamp

Can someone explain this, is there any issue with TrackerID or is there any mistake in my code??@LinasKo @skylargivens @iurisilvio @sberan

Thanks Likith G

likith1908 commented 1 month ago

Hi @LinasKo,

I see similar type of issue was closed and have tried using supervision==0.21.0rc5 as mentioned in #1196 to overcome the issue of tracker_id's , but it leads me to the bug we were facing earlier #1345 , so I had to go back to supervision==0.22.0rc1

Thanks Likith

LinasKo commented 1 month ago

Hi again @likith1908 👋

You don't have to tag any members of the team - we see every issue coming in 😉 (Also, the only core members working on supervision are me and @SkalskiP)

Regarding tracking, it will skip IDs if they're uncertain. We have this on our sights and may fix it in supervision 0.24.0 or supervision 0.25.0, which is estimated to be in September-October.

likith1908 commented 1 month ago

Thanks and Sorry for tagging everyone. Would like to know what is causing to skip the ID's?

Likith

LinasKo commented 1 month ago

We don't know yet. Here's the info we have: https://github.com/roboflow/supervision/issues/1320