Closed franferraz98 closed 9 months ago
Did you get it to work?
I did! Just needed to add a Semaphore to the camera manager class instead:
import logging
import time
import numpy as np
from pathlib import Path
from typing import List, Any, Dict
from threading import Semaphore, Thread
from src.interface_cam import Camera
class MulticameraManager:
def __init__(self):
"""
Interface class for managing multicamera deployments
"""
self.camera_list = []
self.semaphore = Semaphore()
self.stream_threads = {}
self.last_pictures = {}
def add_one_camera(self, camera: Camera) -> int:
"""
Adds a camera to the list
:param camera: Camera to add
:return: Index of the camera
"""
self.camera_list.append(camera)
return len(self.camera_list) - 1
def add_many_cameras(self, cameras: List[Camera]) -> int:
"""
Adds a set of cameras to the list.
:param cameras: List of cameras to add
:return: Index of the last camera
"""
self.camera_list.extend(cameras)
return len(self.camera_list) - 1
def configure_one_camera(self, cam_id: int,
config: Dict[str, Any] or Path) -> None:
"""
Configures one camera with the desired parameters via input dict
:param cam_id: Camera id
:param config: Configuration dictionary or path to the
configuration file
"""
self.camera_list[cam_id].configure_camera(config)
def configure_many_camera(self, cam_ids: List[int],
config: Dict[str, Any] or Path) -> None:
"""
Configures one camera with the desired parameters via input dict
:param cam_ids: List of camera ids
:param config: Configuration dictionary or path to the
configuration file
"""
for cam_id in cam_ids:
self.camera_list[cam_id].configure_camera(config)
def configure_all_cameras(self,
config: Dict[str, Any] or Path) -> None:
"""
Configures all cameras on the list with the desired parameters
via input dict
:param config: Configuration dictionary or path to the
configuration file
"""
for camera in self.camera_list:
camera.configure_camera(config)
def start_one_camera(self, cam_id: int) -> None:
"""
Starts one camera by id
:param cam_id: Camera id
"""
self.camera_list[cam_id].start_camera()
def start_many_cameras(self, cam_ids: List[int]) -> None:
"""
Starts a list of cameras by id
:param cam_ids: List of camera ids
"""
for cam_id in cam_ids:
self.camera_list[cam_id].start_camera()
def start_all_cameras(self) -> None:
"""
Starts all cameras on the list
"""
for camera in self.camera_list:
camera.start_camera()
def close_one_camera(self, cam_id: int) -> None:
"""
Closes one camera by id
:param cam_id: Camera id
"""
self.camera_list[cam_id].close()
if cam_id in self.stream_threads:
self.stream_threads[cam_id].join()
def close_many_cameras(self, cam_ids: List[int]) -> None:
"""
Closes a list cameras by id
:param cam_ids: List of camera ids
"""
for cam_id in cam_ids:
self.camera_list[cam_id].close()
def close_all_cameras(self) -> None:
"""
Close all cameras on the list
"""
for camera in self.camera_list:
camera.close()
def take_one_photo(self, cam_id: int) -> np.ndarray:
"""
Takes a photo from the camera from the id
:param cam_id: Camera id
:return: Picure taken
"""
return self.camera_list[cam_id].take_photo()
def take_many_photos(self, cam_ids: List[int]) -> List[np.ndarray]:
"""
Takes a photo from each camera from the id list.
:param cam_ids: List of camera ids
:return: List of picures taken
"""
img_list = []
for cam_id in cam_ids:
img_list.append(self.camera_list[cam_id].take_photo())
return img_list
def take_all_photos(self) -> List[np.ndarray]:
"""
Takes one picture from each camera on the list.
:return: List of pictures taken
"""
img_list = []
for camera in self.camera_list:
img_list.append(camera.take_photo())
return img_list
def get_one_attr(self, cam_id: int, attr_name: str) -> Any:
"""
Gets the attribute of one camera by name
:param cam_id: Camera id
:param attr_name: Name of the attribute to get
:return: The value of the attribute
"""
return self.camera_list[cam_id].get_attr(attr_name)
def get_many_attrs(self, cam_ids: List[int], attr_name: str) -> List[Any]:
"""
Gets the attribute of many cameras by name
:param cam_ids: List of camera ids
:param attr_name: Name of the attribute to get
:return: The value of the attributes for each camera
"""
attr_list = []
for cam_id in cam_ids:
attr_list.append(self.camera_list[cam_id].get_attr(attr_name))
return attr_list
def get_all_attrs(self, attr_name: str) -> List[Any]:
"""
Gets the attribute of all cameras on the list by name.
:param attr_name: Name of the attribute to get
:return: List of values
"""
attr_list = []
for camera in self.camera_list:
attr_list.append(camera.get_attr(attr_name))
return attr_list
def set_one_attr(self, cam_id: int, attr_name: str, attr_value: Any) -> \
None:
"""
Sets the attribute of one camera by name
:param cam_id: Camera id
:param attr_name: Name of the attribute to set
:param attr_value: Value to set
"""
return self.camera_list[cam_id].set_attr(attr_name, attr_value)
def set_many_attrs(self, cam_ids: List[int], attr_name: str,
attr_value: Any) -> List[Any]:
"""
Sets the attribute of many cameras by name
:param cam_ids: List of camera ids
:param attr_name: Name of the attribute to set
:param attr_value: Value to set
"""
attr_list = []
for cam_id in cam_ids:
attr_list.append(self.camera_list[cam_id].set_attr(
attr_name, attr_value))
return attr_list
def set_all_attrs(self, attr_name: str, attr_value: Any) -> List[Any]:
"""
Sets the attribute of all cameras on the list by name.
:param attr_name: Name of the attribute to set
:param attr_value: Value to set
"""
attr_list = []
for camera in self.camera_list:
attr_list.append(camera.set_attr(attr_name, attr_value))
return attr_list
def start_one_video_stream(self, cam_id: int):
"""
Starts the video streaming of one camera by id
:param cam_id: Camera id
:return: True if the thread started properly, false otherwise
"""
camera = self.camera_list[cam_id]
camera.is_streaming = True
cam_thread = Thread(target=self.capture_frame_loop,
args=(camera.cam_id, camera, self.semaphore,))
self.stream_threads[camera.cam_id] = cam_thread
self.stream_threads[camera.cam_id].daemon = True
self.stream_threads[camera.cam_id].start()
def start_many_video_streams(self, cam_ids: List[int]):
"""
Starts the video streaming of many cameras by id
:param cam_ids: List of camera ids
:return: List of rue if the thread started properly, false
otherwise
"""
for cam_id in cam_ids:
camera = self.camera_list[cam_id]
camera.is_streaming = True
cam_thread = Thread(target=self.capture_frame_loop,
args=(camera.cam_id, camera, self.semaphore,))
self.stream_threads[camera.cam_id] = cam_thread
self.stream_threads[camera.cam_id].daemon = True
self.stream_threads[camera.cam_id].start()
def start_all_video_streams(self):
"""
Starts the video streaming of all cameras on the list.
:return: List of true if the thread started properly, false
otherwise
"""
for camera in self.camera_list:
camera.is_streaming = True
cam_thread = Thread(target=self.capture_frame_loop,
args=(camera.cam_id, camera, self.semaphore,))
self.stream_threads[camera.cam_id] = cam_thread
self.stream_threads[camera.cam_id].daemon = True
self.stream_threads[camera.cam_id].start()
def capture_frame_loop(self, cam_id: int, camera: Camera,
semaphore: Semaphore):
while camera.is_streaming:
semaphore.acquire()
last_image = camera.capture_frame()
semaphore.release()
self.last_pictures[cam_id] = last_image
time.sleep(0.00001)
logging.info(f"Exiting capture loop for camera {cam_id}")
def reboot_one(self, cam_id: int) -> None:
"""
Reboots one camera by id
:param cam_id: Camera id
"""
self.camera_list[cam_id].reboot()
def reboot_many(self, cam_ids: List[int]) -> None:
"""
Reboots many cameras by id
:param cam_ids: List of camera ids
"""
for cam_id in cam_ids:
self.camera_list[cam_id].reboot()
def reboot_all(self) -> None:
"""
Reboots all cameras on the list
"""
for camera in self.camera_list:
camera.reboot()
And then
import time
import logging
import numpy as np
from typing import Any, Tuple
from pathlib import Path
from threading import Thread, Semaphore
from abc import ABC, abstractmethod
class Camera(ABC):
def __init__(self, config: dict[str, Any] or Path, cam_id: int = 0):
"""
Generic abstract camera superclass for the different types of
cameras.
:param config: Dictionary containing the configuration variables
of the camera.
:param cam_id: Camera id
"""
self.width = None
self.height = None
self.camera = None
self.is_streaming = False
self.stream_thread = None
self.last_image = None
self.cam_id = cam_id
self.config = config
@abstractmethod
def configure_camera(self, config: dict[str, Any] or Path) -> None:
"""
Configures the camera with the desired parameters via input dict
:param config: Configuration dictionary or path to the
configuration file
"""
@abstractmethod
def start_camera(self) -> None:
"""Initializes the camera."""
@abstractmethod
def get_framerate(self) -> float:
"""
Gets the framerate
:return: Number of frames per second
"""
@abstractmethod
def get_exposure(self) -> float:
"""
Gets the exposure time.
:return: Exposure time, in milliseconds
"""
@abstractmethod
def get_resolution(self) -> Tuple[int, int]:
"""
Gets the camera resolution
:return: Number of pixels in (width, height) format
"""
@abstractmethod
def get_white_balance(self) -> float:
"""
Set white balance.
:return: White balancing ratio
"""
@abstractmethod
def set_framerate(self, framerate: float) -> None:
"""
Sets the framerate
:param framerate: Number of frames per second
:return: True if success, False otherwise
"""
@abstractmethod
def set_exposure(self, exposure: float) -> None:
"""
Gets the exposure time.
:param exposure: Exposure time, in milliseconds.
"""
@abstractmethod
def set_resolution(self, resolution: Tuple[int, int]) -> None:
"""
Sets the camera resolution
:param resolution: Number of pixels in (width, height) format
"""
@abstractmethod
def set_white_balance(self, wbr: float) -> None:
"""
Sets the white balance ratio.
:param wbr: White balance ratio
"""
@abstractmethod
def capture_frame(self) -> np.ndarray:
"""Captures a frame."""
def capture_frame_loop(self) -> None:
"""Loops capture_frame and updates last_image"""
while self.is_streaming:
self.last_image = self.capture_frame()
logging.info("Exiting capture loop")
def is_aquiring(self) -> bool:
"""Retruns true if the streaming is on, false otherwise"""
if self.stream_thread is not None:
return self.stream_thread.is_alive()
else:
return False
def get_last_frame(self) -> np.ndarray:
"""Returns the last frame taken by the camera"""
return self.last_image
def take_photo(self) -> np.ndarray:
"""
Captures a frame with the camera,
counts the time it takes and logs it.
:return: An OpenCV image in RGB format
"""
start_take_photo = time.perf_counter()
frame = self.capture_frame()
t_take_photo = (time.perf_counter() - start_take_photo) * 1000
logging.info(f"Image taken in {t_take_photo:.0f} milliseconds")
return frame
def start_video_stream(self) -> bool:
"""
Starts an inner thread that takes pictures
"""
try:
self.is_streaming = True
self.stream_thread = Thread(target=self.capture_frame_loop,
args=())
self.stream_thread.daemon = True
self.stream_thread.start()
except Exception as e:
logging.error(f"Video stream couldn't be started... \n{e}")
return False
return True
@abstractmethod
def close(self) -> None:
"""Closes the camera."""
def reboot(self) -> None:
"""Reboots the camera."""
self.close()
time.sleep(10)
self.start_camera()
This is working fine aside from the fact that the cameras are a bit slowed down. I'll close the issue now.
Hi,
I'm new to harvesters and I'm trying to make a code that enables me to see the video streaming from two PoE cameras through it. I've sucessfully been able to make the code for streaming with one camera, but at the moment I try to use 2 cameras only one streaming appears and it gets blocked. The final objective of my project is to standarize the usage of multiple libraries for my company, so I'm implementing this in a kind of weird way.
This is how I define my Camera Interface for the different libraries that I want to standardize:
This is how I define the Genicam Class:
This is how I define my multicamera manager:
And this is how I instance and try the classes:
Am I doing something wrong? I even tried adding semaphores to let independence between the pictures, but it gets blocked anyway.
Thanks in advance! EDIT: Simplified the code a bit to make it readable.