yihong1120 / Construction-Hazard-Detection

Enhances construction site safety using YOLOv8 for object detection, identifying hazards like workers without helmets or safety vests, and proximity to machinery or vehicles. HDBSCAN clusters safety cone coordinates to create monitored zones. Post-processing algorithms improve detection accuracy.
https://wiki.mutable.ai/yihong1120/Construction-Hazard-Detection
GNU Affero General Public License v3.0
186 stars 18 forks source link

Fix code scanning alert - Uncontrolled data used in path expression #15

Open yihong1120 opened 1 month ago

yihong1120 commented 1 month ago

Tracking issue for:

codeautopilot[bot] commented 1 month ago

Potential solution

To solve the bug related to "Uncontrolled data used in path expression," we need to ensure that all user inputs used in path expressions are properly validated and sanitized. This will prevent path traversal vulnerabilities and other security issues. The plan involves:

  1. Sanitizing and validating user inputs: Ensure that inputs like URLs, file names, and directory paths do not contain malicious content.
  2. Using secure methods for path handling: Utilize libraries and functions that help in securely constructing and validating paths.

What is causing this bug?

The bug is caused by the lack of validation and sanitization of user inputs used in path expressions. Specifically:

Code

Here are the implementation details and code snippets to fix the issues:

src/stream_capture.py

Changes:

  1. Sanitize and validate the stream URL.
import re
from urllib.parse import urlparse
import argparse
import gc
import cv2
import time

class StreamCapture:
    def __init__(self, stream_url: str, capture_interval: int = 15):
        self.stream_url = stream_url
        self.cap: cv2.VideoCapture | None = None
        self.capture_interval = capture_interval

    def initialise_stream(self, stream_url: str) -> None:
        self.cap = cv2.VideoCapture(stream_url)
        self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
        if not self.cap.isOpened():
            time.sleep(5)
            self.cap.open(stream_url)

    def execute_capture(self):
        # Implementation of frame capture logic
        pass

def is_valid_url(url: str) -> bool:
    parsed_url = urlparse(url)
    return all([parsed_url.scheme, parsed_url.netloc])

def sanitize_url(url: str) -> str:
    url = re.sub(r'[^\w\-/:.?&=]', '', url)
    return url

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Capture video stream frames.')
    parser.add_argument('--url', type=str, help='Live stream URL', required=True)
    args = parser.parse_args()

    sanitized_url = sanitize_url(args.url)
    if not is_valid_url(sanitized_url):
        raise ValueError("Invalid URL provided")

    stream_capture = StreamCapture(sanitized_url)
    for frame, timestamp in stream_capture.execute_capture():
        print(f"Frame at {timestamp} displayed")
        del frame
        gc.collect()

src/model_fetcher.py

Changes:

  1. Sanitize the model_name parameter to prevent path traversal.
from pathlib import Path
import requests
import re

class ModelInfo(TypedDict):
    model_name: str
    url: str

def sanitize_model_name(model_name: str) -> str:
    if not re.match(r'^[\w\-]+$', model_name):
        raise ValueError(f"Invalid model name: {model_name}")
    return model_name

def download_model(model_name, url):
    model_name = sanitize_model_name(model_name)
    LOCAL_MODEL_DIRECTORY = Path('models/pt/')
    LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name

    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return

    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(local_file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"'{model_name}' saved to '{local_file_path}'.")
    else:
        print(f"Error downloading '{model_name}': {response.status_code}")

def main():
    MODEL_URLS = {
        'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
        'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
    }

    for model_name, url in MODEL_URLS.items():
        download_model(model_name, url)

if __name__ == '__main__':
    main()

src/live_stream_detection.py

Changes:

  1. Validate and sanitize the output_folder and model_key parameters.
  2. Validate the stream_url parameter.
import os
import argparse
import cv2
from pathlib import Path
import re
from urllib.parse import urlparse

class LiveStreamDetector:
    def __init__(self, api_url: str = 'http://localhost:5000', model_key: str = 'yolov8l', output_folder: str | None = None, run_local: bool = True):
        self.api_url = api_url
        self.model_key = self.sanitize_model_key(model_key)
        self.output_folder = self.sanitize_output_folder(output_folder)
        self.run_local = run_local

    def sanitize_output_folder(self, output_folder: str) -> str:
        if output_folder:
            output_folder = os.path.abspath(output_folder)
            if not output_folder.startswith(os.getcwd()):
                raise ValueError("Invalid output folder path")
        return output_folder

    def sanitize_model_key(self, model_key: str) -> str:
        if not re.match(r'^[\w\-]+$', model_key):
            raise ValueError(f"Invalid model key: {model_key}")
        return model_key

    def generate_detections_local(self, frame: cv2.Mat) -> list[list[float]]:
        model_path = Path('models/pt/') / f"best_{self.model_key}.pt"
        # Implementation of detection logic
        pass

    def run_detection(self, stream_url: str) -> None:
        if not self.is_valid_url(stream_url):
            raise ValueError("Invalid stream URL")
        cap = cv2.VideoCapture(stream_url)
        # Implementation of detection logic
        pass

    def is_valid_url(self, url: str) -> bool:
        parsed_url = urlparse(url)
        return all([parsed_url.scheme, parsed_url.netloc])

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Perform live stream detection and tracking using YOLOv8.')
    parser.add_argument('--url', type=str, help='Live stream URL', required=True)
    parser.add_argument('--api_url', type=str, default='http://localhost:5000', help='API URL for detection')
    parser.add_argument('--model_key', type=str, default='yolov8n', help='Model key for detection')
    parser.add_argument('--output_folder', type=str, help='Output folder for detected frames')
    parser.add_argument('--run_local', action='store_true', help='Run detection using local model')
    args = parser.parse_args()

    detector = LiveStreamDetector(api_url=args.api_url, model_key=args.model_key, output_folder=args.output_folder, run_local=args.run_local)
    detector.run_detection(args.url)

How to replicate the bug

  1. For src/stream_capture.py:

    • Run the script with a malicious URL input that contains path traversal sequences.
    • Example: python src/stream_capture.py --url "http://example.com/../../etc/passwd"
  2. For src/model_fetcher.py:

    • Run the script with a malicious model name that contains path traversal sequences.
    • Example: Modify MODEL_URLS to include a key like ../etc/passwd.
  3. For src/live_stream_detection.py:

    • Run the script with malicious inputs for --output_folder and --model_key.
    • Example: python src/live_stream_detection.py --url "http://example.com" --output_folder "../../etc" --model_key "../etc/passwd"

By implementing the recommended changes, these vulnerabilities will be mitigated, ensuring that user inputs are properly validated and sanitized.

Click here to create a Pull Request with the proposed solution

Files used for this task:

Changes on src/stream_capture.py ## Analysis Report ### Overview The file `src/stream_capture.py` is responsible for capturing frames from a video stream. It includes functionalities to initialize the stream, capture frames at specified intervals, check internet speed, select stream quality based on speed, and handle failures in capturing frames. ### Potential Security Issue The primary concern mentioned in the issue is the "Uncontrolled data used in path expression," which can lead to path traversal vulnerabilities. In this file, the user input is primarily the stream URL provided via command-line arguments. ### Key Areas to Review 1. **Stream URL Handling**: - The stream URL is taken directly from user input without any validation or sanitization. - This URL is used in methods like `initialise_stream` and `select_quality_based_on_speed`. ### Detailed Analysis #### Stream URL Handling - **Initialization**: ```python def __init__(self, stream_url: str, capture_interval: int = 15): self.stream_url = stream_url self.cap: cv2.VideoCapture | None = None self.capture_interval = capture_interval ``` - **Stream Initialization**: ```python def initialise_stream(self, stream_url: str) -> None: self.cap = cv2.VideoCapture(stream_url) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) if not self.cap.isOpened(): time.sleep(5) self.cap.open(stream_url) ``` - **Command-line Argument Parsing**: ```python if __name__ == '__main__': parser = argparse.ArgumentParser( description='Capture video stream frames.', ) parser.add_argument( '--url', type=str, help='Live stream URL', required=True, ) args = parser.parse_args() stream_capture = StreamCapture(args.url) for frame, timestamp in stream_capture.execute_capture(): print(f"Frame at {timestamp} displayed") del frame gc.collect() ``` ### Recommendations 1. **Sanitize and Validate Stream URL**: - Ensure that the stream URL is a valid URL and does not contain any malicious content. - Use libraries like `urllib.parse` to parse and validate the URL. 2. **Secure Methods for Handling URLs**: - Avoid directly using user inputs in critical operations without validation. - Implement a whitelist of allowed URL patterns if possible. ### Example Fix Here is an example of how you can sanitize and validate the stream URL: ```python import re from urllib.parse import urlparse def is_valid_url(url: str) -> bool: # Basic URL validation parsed_url = urlparse(url) return all([parsed_url.scheme, parsed_url.netloc]) def sanitize_url(url: str) -> str: # Remove any unwanted characters or patterns url = re.sub(r'[^\w\-/:.?&=]', '', url) return url if __name__ == '__main__': parser = argparse.ArgumentParser( description='Capture video stream frames.', ) parser.add_argument( '--url', type=str, help='Live stream URL', required=True, ) args = parser.parse_args() sanitized_url = sanitize_url(args.url) if not is_valid_url(sanitized_url): raise ValueError("Invalid URL provided") stream_capture = StreamCapture(sanitized_url) for frame, timestamp in stream_capture.execute_capture(): print(f"Frame at {timestamp} displayed") del frame gc.collect() ``` ### Conclusion The primary issue in this file is the lack of validation and sanitization of the stream URL provided by the user. By implementing the recommended changes, you can mitigate the risk of path traversal and other related vulnerabilities.
Changes on src/model_fetcher.py # Analysis Report for `src/model_fetcher.py` ## Overview The `src/model_fetcher.py` file is responsible for downloading model files from specified URLs and saving them locally. The main function iterates over a dictionary of model names and URLs, and the `download_model` function handles the actual downloading and saving of the files. ## Potential Security Issue The primary concern in this file is the potential for path traversal vulnerabilities due to uncontrolled data being used in path expressions. Specifically, the `model_name` parameter in the `download_model` function is used directly to construct file paths without any validation or sanitization. ## Detailed Analysis 1. **Path Construction**: ```python LOCAL_MODEL_DIRECTORY = Path('models/pt/') local_file_path = LOCAL_MODEL_DIRECTORY / model_name ``` The `model_name` is directly appended to the `LOCAL_MODEL_DIRECTORY` to form the `local_file_path`. If `model_name` contains path traversal characters (e.g., `../`), it could lead to writing files outside the intended directory. 2. **File Existence Check**: ```python if local_file_path.exists(): print(f"'{model_name}' exists. Skipping download.") return ``` This check ensures that the file is not downloaded if it already exists, but it does not mitigate the risk of path traversal. 3. **File Writing**: ```python with open(local_file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) ``` Writing to the file without validating the `model_name` can lead to overwriting critical files if path traversal is exploited. ## Recommendations To mitigate the risk of path traversal vulnerabilities, the following steps should be taken: 1. **Sanitize `model_name`**: Ensure that `model_name` does not contain any path traversal characters. This can be done by validating the input and allowing only safe characters (e.g., alphanumeric characters, underscores, and hyphens). 2. **Use Secure Methods for Path Handling**: Utilize secure methods to handle file paths, ensuring that the constructed paths are within the intended directory. ### Example Implementation Here is an example of how to implement these recommendations: ```python from __future__ import annotations from pathlib import Path from typing import TypedDict import requests import re class ModelInfo(TypedDict): model_name: str url: str def sanitize_model_name(model_name: str) -> str: """ Sanitize the model name to prevent path traversal attacks. Args: model_name (str): The name of the model file. Returns: str: The sanitized model name. """ # Allow only alphanumeric characters, underscores, and hyphens if not re.match(r'^[\w\-]+$', model_name): raise ValueError(f"Invalid model name: {model_name}") return model_name def download_model(model_name, url): """ Download a model file if it doesn't already exist. Args: model_name (str): The name of the model file. url (str): The URL of the model file. """ # Sanitize the model name model_name = sanitize_model_name(model_name) # Define the local directory to store the model files LOCAL_MODEL_DIRECTORY = Path('models/pt/') # Ensure the local directory exists LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True) # Build the full local file path local_file_path = LOCAL_MODEL_DIRECTORY / model_name # Check if the model already exists if local_file_path.exists(): print(f"'{model_name}' exists. Skipping download.") return # Send an HTTP GET request to fetch the model file response = requests.get(url, stream=True) if response.status_code == 200: # Begin the download and write to the file with open(local_file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"'{model_name}' saved to '{local_file_path}'.") else: print(f"Error downloading '{model_name}': {response.status_code}") def main(): # Define the URLs for the model files MODEL_URLS = { 'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt', 'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt', } # Iterate over all models and download them if they don't already exist for model_name, url in MODEL_URLS.items(): download_model(model_name, url) if __name__ == '__main__': main() ``` ## Conclusion The primary issue in the `src/model_fetcher.py` file is the lack of validation and sanitization of the `model_name` parameter, which can lead to path traversal vulnerabilities. By implementing input sanitization and using secure methods for path handling, this vulnerability can be mitigated.
Changes on src/live_stream_detection.py # Analysis Report for `src/live_stream_detection.py` ## Overview The `src/live_stream_detection.py` file is responsible for performing live stream detection and tracking using YOLOv8 with SAHI. It includes functionalities for both local and cloud-based detection. The primary concern is to review and sanitize any user inputs used in path expressions to prevent path traversal attacks. ## Key Areas of Concern 1. **User Inputs in Path Expressions**: - The `output_folder` parameter in the `LiveStreamDetector` class. - The `model_key` parameter in the `LiveStreamDetector` class. - The `stream_url` parameter in the `run_detection` method. - Command-line arguments parsed using `argparse`. ## Detailed Analysis ### 1. `output_folder` Parameter The `output_folder` parameter is used to specify the folder for detected frames. This parameter can potentially be manipulated to perform path traversal attacks. **Current Usage**: ```python def __init__(self, api_url: str = 'http://localhost:5000', model_key: str = 'yolov8l', output_folder: str | None = None, run_local: bool = True): ... self.output_folder = output_folder ... ``` **Recommendation**: - Validate and sanitize the `output_folder` parameter to ensure it does not contain any path traversal sequences. - Use `os.path.abspath` to convert the path to an absolute path and check if it is within a designated directory. ### 2. `model_key` Parameter The `model_key` parameter is used to construct the model path. This parameter can also be manipulated to perform path traversal attacks. **Current Usage**: ```python def generate_detections_local(self, frame: cv2.Mat) -> list[list[float]]: ... model_path = Path('models/pt/') / f"best_{self.model_key}.pt" ... ``` **Recommendation**: - Validate the `model_key` parameter to ensure it does not contain any path traversal sequences. - Use a whitelist of allowed model keys to prevent arbitrary file access. ### 3. `stream_url` Parameter The `stream_url` parameter is used to specify the URL of the live stream. While this is not directly related to path traversal, it is important to ensure that the URL is valid and safe. **Current Usage**: ```python def run_detection(self, stream_url: str) -> None: ... cap = cv2.VideoCapture(stream_url) ... ``` **Recommendation**: - Validate the `stream_url` parameter to ensure it is a valid URL. - Use a library like `validators` to check the URL format. ### 4. Command-line Arguments The command-line arguments parsed using `argparse` include `--url`, `--api_url`, `--model_key`, `--output_folder`, and `--run_local`. **Current Usage**: ```python if __name__ == '__main__': parser = argparse.ArgumentParser(description='Perform live stream detection and tracking using YOLOv8.') parser.add_argument('--url', type=str, help='Live stream URL', required=True) parser.add_argument('--api_url', type=str, default='http://localhost:5000', help='API URL for detection') parser.add_argument('--model_key', type=str, default='yolov8n', help='Model key for detection') parser.add_argument('--output_folder', type=str, help='Output folder for detected frames') parser.add_argument('--run_local', action='store_true', help='Run detection using local model') args = parser.parse_args() ``` **Recommendation**: - Validate and sanitize the command-line arguments, especially `--output_folder` and `--model_key`. - Implement validation checks to ensure these parameters do not contain any malicious input. ## Conclusion The primary areas of concern in this file are the `output_folder` and `model_key` parameters, which are used in path expressions and can potentially be exploited for path traversal attacks. Additionally, the `stream_url` parameter and command-line arguments should be validated to ensure they are safe and valid. ### Action Items 1. Implement validation and sanitization for the `output_folder` parameter. 2. Implement validation and sanitization for the `model_key` parameter. 3. Validate the `stream_url` parameter to ensure it is a valid URL. 4. Validate and sanitize command-line arguments, especially `--output_folder` and `--model_key`. By addressing these concerns, we can mitigate the risk of path traversal attacks and ensure the security of the application.