epfl-cs358 / 2024sp-robopong

2 stars 0 forks source link

[CV] Interface with microcontrollers and camera + run algos in one program #19

Closed Amene-Gafsi closed 5 months ago

Amene-Gafsi commented 5 months ago

The objective of this algorithm is to track and predict the trajectory of a ball on a playing field, along with detecting paddle positions in real-time. The main method orchestrates this by initially calibrating the camera to accurately detect markers within the environment, enhancing the precision of object tracking.

Here's a breakdown of how the algorithm functions:

1. Camera Calibration: The camera is calibrated using ArUco markers. This step helps to establish a frame of reference and is essential for accurately positioning the ball and paddles within the captured video frames. 2. Frame Processing: Each video frame is processed to detect and track the ball and paddles. This involves applying a Gaussian blur for noise reduction, converting the frame to the HSV color space, and then using color thresholding to create masks that isolate the ball and paddles based on their colors. Contours are extracted from these masks to determine the positions and sizes of the ball and paddles. 3. Position and Movement Analysis: The positions of the ball are continuously recorded with timestamps. Using this data, the algorithm calculates the velocity of the ball in both horizontal and vertical directions. 4. Prediction of Ball Trajectory: With the calculated velocities, the algorithm predicts where the ball will ultimately end up on the y-axis of the playing field. This prediction is based on the physical properties of the playing field and the dynamics of motion influenced by gravity. 5. Real-Time Updates and Predictions: As the game progresses, the algorithm constantly updates its predictions about the ball's final position on the y-axis, helping in strategizing the paddle positions to effectively interact with the ball.

The algorithm operates in a loop within the main method, capturing frames in real-time, processing these frames to track and analyze movements, and predicting future positions.

import cv2
import cv2.aruco as aruco
from collections import deque
from imutils.video import VideoStream
import numpy as np
import imutils
import time

greenLower = (20, 50, 100)
greenUpper = (90,255,180)
redLower = (0, 120, 70)
redUpper = (10, 255, 255)

ball_pts = deque(maxlen=64)
paddle1_pts = deque(maxlen=64)
paddle2_pts = deque(maxlen=64)

paddle_width_lower = 100
ball_radius_lower = 10
board_center = 300
board_width = 600
ball_to_edge_max_dist = 50
tolerance = 1

def calculate_velocity(observations):
    # Extract positions and times
    times = np.array([obs[0] for obs in observations])
    xs = np.array([obs[1] for obs in observations])
    ys = np.array([obs[2] for obs in observations])

    # Linear regression to find velocity
    vx, _ = np.polyfit(times, xs, 1)
    vy, _ = np.polyfit(times, ys, 1)
    return vx, vy

def predict_final_position_and_time(initial_position, vx, vy):
    # Constants
    g = 0.0098  # mm/ms^2 adjusted for scale
    length = 150  # mm, half the table length
    height = 1.5  # mm, height difference
    theta = np.arctan(height / length)
    gx = g * np.sin(theta)

    # Initial positions and velocities
    x, y = initial_position
    board_width = 300  # Total width of the board

    # First check if the ball will reach 150 mm, the midpoint
    # Time to reach midpoint if constant velocity is maintained
    if vx == 0:
        raise ValueError("Velocity in x cannot be zero for the calculation.")
    time_to_mid = (150 - x) / vx if vx != 0 else float('inf')
    vx_mid = vx - gx * time_to_mid
    x_mid = x + vx * time_to_mid - 0.5 * gx * time_to_mid**2
    y_mid = y + vy * time_to_mid

    # Check if it reaches the end of the board
    # Time to go from midpoint to the end of the board
    time_to_end = (board_width - x_mid) / vx_mid if vx_mid != 0 else float('inf')
    y_end = y_mid + vy * time_to_end

    # Total time from start to the end of the board
    total_time = time_to_mid + time_to_end

    return (y_end, total_time)

def calibrate_camera (vs) :
    aruco_dict = aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    while True:
        ret, frame = vs.read()
        if not ret:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  
        corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict) 
        if ids is not None and len(corners) > 0:
            aruco.drawDetectedMarkers(frame, corners, ids)    
            if ids is not None and len(corners) >= 4:
                flat_corners = [corner[0] for corner in corners]     
                for i in range(len(flat_corners)):
                    next_i = (i + 1) % len(flat_corners)
                    start_point = tuple(flat_corners[i][0].astype(int))
                    end_point = tuple(flat_corners[next_i][0].astype(int))
                    cv2.line(frame, start_point, end_point, (0, 255, 0), 2)
        # Display the frame
        cv2.imshow('Frame', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

def process_frame(frame):
    blurred = cv2.GaussianBlur(frame, (11, 11), 0)
    hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV)

    ball_mask = cv2.inRange(hsv, greenLower, greenUpper)
    ball_mask = cv2.erode(ball_mask, None, iterations=2)
    ball_mask = cv2.dilate(ball_mask, None, iterations=2)
    ball_cnts = cv2.findContours(ball_mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    ball_cnts = imutils.grab_contours(ball_cnts)
    ball_center = None

    paddle_mask = cv2.inRange(hsv, redLower, redUpper)
    paddle_mask = cv2.erode(paddle_mask, None, iterations=2)
    paddle_mask = cv2.dilate(paddle_mask, None, iterations=2)
    paddle_cnts = cv2.findContours(paddle_mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    paddle_cnts = imutils.grab_contours(paddle_cnts)
    paddle_center = None

    if len(ball_cnts) > 0:
        ball_c = max(ball_cnts, key=cv2.contourArea)
        ((ball_x, ball_y), ball_radius) = cv2.minEnclosingCircle(ball_c)        
        if ball_radius > ball_radius_lower:
            M = cv2.moments(ball_c)
            ball_center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
            cv2.circle(frame, (int(ball_x), int(ball_y)), int(ball_radius),
                (0, 255, 255), 2)
            cv2.circle(frame, ball_center, 5, (0, 0, 255), -1)

    if len(paddle_cnts) > 0:
        paddle_c = max(paddle_cnts, key=cv2.contourArea)
        paddle_x, paddle_y, paddle_w, paddle_h = cv2.boundingRect(paddle_c)
        if paddle_w > paddle_width_lower:
            paddle_center = (int(paddle_x + paddle_w / 2), int(paddle_y + paddle_h / 2))
            cv2.rectangle(frame, (paddle_x, paddle_y), (paddle_x + paddle_w, paddle_y + paddle_h), (0, 255, 0), 2)
            cv2.circle(frame, paddle_center, 5, (0, 0, 255), -1)

    if ball_center is not None : 
        print(f"Ball Center: {ball_center}")
        timestamp = time.time()
        ball_pts.appendleft((timestamp, ball_center))
    if paddle_center is not None:
        if paddle_center[0] < board_center:
            paddle1_pts.appendleft(paddle_center)
        else:
            paddle2_pts.appendleft(paddle_center)

def main():
    vs = cv2.VideoCapture(1, cv2.CAP_DSHOW)
    calibrate_camera(vs)
    last_predicted_position = 0

    while True:
        frame = vs.read()[1]
        if frame is None: break
        process_frame(frame)
        if len(ball_pts) >= 2:
            # Get the last 64 ball positions and times
            observations = [(timestamp, x, y) for timestamp, (x, y) in ball_pts]   

            # Compute last y position prediction
            vx, vy = calculate_velocity(observations)
            _, last_obs_x, last_obs_y = observations[-1]
            y_position_at_end, total_time = predict_final_position_and_time((last_obs_x, last_obs_y), vx, vy)

            # Only consider cases where the difference between the old and new prediction are different enough
            if(abs(last_predicted_position - y_position_at_end) > tolerance):
                print(f"Y-position at end of board: {y_position_at_end} mm, Time to reach end: {total_time} ms")
                last_predicted_position = y_position_at_end

            # check if last y from ball_pts is close enough to end of board then clar ball_pts
            if (last_obs_y > board_width - ball_to_edge_max_dist or last_obs_y < ball_to_edge_max_dist):
                ball_pts.clear()

        cv2.imshow("Frame", frame)
        key = cv2.waitKey(1) & 0xFF
        if key == ord("q"):
            break

    vs.release()
    cv2.destroyAllWindows()

main()
Amene-Gafsi commented 5 months ago

Once the position is predicted, we can transmit this data to the Arduino to accordingly control the motor. This can be achieved with a simple piece of code in Python, placed within the tolerance condition of the main function:

import serial
import time

# Initialize the serial connection
ser = serial.Serial('COM3', 9600)  # Adjust 'COM3' to your Arduino's COM port
time.sleep(2)
ser.write(f"{last_predicted_position},{total_time}\n".encode())
ser.close()  # Close the serial connection when done

For the Arduino side, to receive and process the variables, the following simple code can be used:

void setup() {
    Serial.begin(9600); // Start serial communication at 9600 bps
}

void loop() {
    if (Serial.available()) {
        String data = Serial.readStringUntil('\n'); // Read the data until newline
        int separator = data.indexOf(',');
        long last_predicted_position = data.substring(0, separator).toInt();
        long total_time = data.substring(separator + 1).toInt();

        Serial.print("Received last position: ");
        Serial.print(last_predicted_position);
        Serial.print(", Total time: ");
        Serial.println(total_time);
    }
}