Open yihong1120 opened 1 month ago
The plan to solve the bug involves validating and sanitizing user inputs and external data used in path expressions across all relevant files. This will mitigate the risk of path traversal attacks and ensure the security of the application. The solution includes implementing URL validation, sanitizing file paths, and adding error handling and logging mechanisms.
The bug is caused by the lack of validation and sanitization of user inputs and external data used in path expressions. Specifically:
src/stream_capture.py
, the stream URL is taken directly from user input without validation.src/live_stream_detection.py
, parameters like output_folder
, stream_url
, and model_key
are used without proper validation.src/stream_viewer.py
, the stream URL is used directly to initialize the video capture object without validation.src/model_fetcher.py
, the model_name
is used to construct file paths without sanitization, leading to potential path traversal vulnerabilities.Here are the implementation details and code snippets for each file:
src/stream_capture.py
from urllib.parse import urlparse
def is_valid_url(url: str) -> bool:
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False
# Usage
if not is_valid_url(args.url):
raise ValueError("Invalid URL provided")
src/live_stream_detection.py
output_folder
from pathlib import Path
def sanitize_path(path: str) -> str:
safe_base = Path("/safe/directory").resolve()
resolved_path = (safe_base / path).resolve()
if not resolved_path.is_relative_to(safe_base):
raise ValueError("Unsafe path detected")
return str(resolved_path)
# Usage in the constructor
self.output_folder = sanitize_path(output_folder) if output_folder else None
stream_url
from urllib.parse import urlparse
def validate_url(url: str) -> str:
result = urlparse(url)
if all([result.scheme, result.netloc]):
return url
else:
raise ValueError("Invalid URL")
# Usage in the run_detection method
stream_url = validate_url(stream_url)
model_key
import re
def sanitize_model_key(model_key: str) -> str:
if not re.match(r'^[a-zA-Z0-9_-]+$', model_key):
raise ValueError("Invalid model key")
return model_key
# Usage in the constructor and generate_detections_local method
self.model_key = sanitize_model_key(model_key)
model_path = Path('models/pt/') / f"best_{self.model_key}.pt"
src/stream_viewer.py
from urllib.parse import urlparse
class StreamViewer:
def __init__(self, stream_url: str, window_name: str = 'Stream Viewer'):
if not self.is_valid_url(stream_url):
raise ValueError(f"Invalid stream URL: {stream_url}")
self.stream_url = stream_url
self.window_name = window_name
self.cap = cv2.VideoCapture(self.stream_url)
def is_valid_url(self, url: str) -> bool:
parsed_url = urlparse(url)
if parsed_url.scheme not in ['http', 'https', 'rtsp']:
return False
allowed_domains = ['kctmc.nat.gov.tw']
if parsed_url.hostname not in allowed_domains:
return False
return True
def display_stream(self):
while True:
ret, frame = self.cap.read()
if ret:
cv2.imshow(self.window_name, frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print('Failed to retrieve frame.')
break
self.release_resources()
def release_resources(self):
self.cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
video_url = 'https://cctv4.kctmc.nat.gov.tw/50204bfc/'
viewer = StreamViewer(video_url)
viewer.display_stream()
src/model_fetcher.py
model_name
from pathlib import Path
import requests
def sanitize_model_name(model_name: str) -> str:
if not model_name.isalnum() and "_" not in model_name:
raise ValueError("Invalid model name")
return model_name
def download_model(model_name, url):
model_name = sanitize_model_name(model_name)
LOCAL_MODEL_DIRECTORY = Path('models/pt/')
LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
local_file_path = LOCAL_MODEL_DIRECTORY / model_name
if local_file_path.exists():
print(f"'{model_name}' exists. Skipping download.")
return
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(local_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"'{model_name}' saved to '{local_file_path}'.")
else:
print(f"Error downloading '{model_name}': {response.status_code}")
def main():
MODEL_URLS = {
'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
}
for model_name, url in MODEL_URLS.items():
download_model(model_name, url)
if __name__ == '__main__':
main()
For src/stream_capture.py
:
python src/stream_capture.py --url "http://malicious.url"
For src/live_stream_detection.py
:
output_folder
, stream_url
, or model_key
to the LiveStreamDetector
class.detector = LiveStreamDetector(output_folder="../../etc/passwd", stream_url="http://malicious.url", model_key="../../etc/passwd")
For src/stream_viewer.py
:
StreamViewer
class with a malicious URL.viewer = StreamViewer("http://malicious.url")
For src/model_fetcher.py
:
model_name
to the MODEL_URLS
dictionary.MODEL_URLS = {'../../etc/passwd': 'http://malicious.url'}
By following these steps, you can replicate the bug and observe the potential security vulnerabilities caused by uncontrolled data used in path expressions.
Click here to create a Pull Request with the proposed solution
Files used for this task:
Tracking issue for: