Closed aaiguy closed 2 years ago
You can simply replace the "loading from a list of files code" with grabbing images from a camera via opencv: https://docs.opencv.org/4.x/dd/d43/tutorial_py_video_display.html
This should be fairly straightforward - let me know if you have more questions.
hey @Tobias-Fischer , thanks for the reply. I was able to test rt_gene on video by passing individual frame to estimate_graze function but if I want to do same on rt_bene standalone I should pass the left and right eye image file path seperately , is it possible to extract left and right eye from video frame and do blink detection? if so how can I achieve it?
Yes - you will need to dig a bit through the code, here is a starting point: https://github.com/Tobias-Fischer/rt_gene/blob/aef31be7031f2f93cdd71e603d73375c0fcd4887/rt_gene/src/rt_gene/tracker_generic.py#L28
hey, thanks again. I managed to do eye blink counter on real time video or webcam using below code
from rt_gene.extract_landmarks_method_base import LandmarkMethodBase
import os
import sys
import cv2
import time
import numpy as np
from rt_bene.estimate_blink_pytorch import BlinkEstimatorPytorch
sys.path.insert(0,r'..\rt_gene\src')
script_path = r'..\rt_gene_standalone'
landmark_estimator = LandmarkMethodBase(device_id_facedetection='cuda:0',
checkpoint_path_face=os.path.abspath(os.path.join(script_path, "../rt_gene/model_nets/SFD/s3fd_facedetector.pth")),
checkpoint_path_landmark=os.path.abspath(
os.path.join(script_path, "../rt_gene/model_nets/phase1_wpdc_vdc.pth.tar")),
model_points_file=os.path.abspath(os.path.join(script_path, "../rt_gene/model_nets/face_model_68.txt")))
blink_estimator = BlinkEstimatorPytorch(device_id_blink="cuda", threshold=0.1, model_files=[r'C:\research\gaze\rt_gene\rt_gene\model_nets\blink_model_pytorch_vgg16_allsubjects1.model'], model_type="vgg16")
cap = cv2.VideoCapture(r'video.mp4')
if not cap.isOpened():
print("Cannot open camera")
exit()
while True:
# Capture frame-by-frame
ret, frame = cap.read()
# if frame is read correctly ret is True
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
break
# frame = cv2.imread(r'C:\research\gaze\rt_gene\rt_gene_standalone\samples_gaze\gaze_center.jpg')
image_c = frame.copy()
color_img = frame
faceboxes = landmark_estimator.get_face_bb(color_img)
im_width, im_height = frame.shape[1], frame.shape[0]
_dist_coefficients, _camera_matrix = np.zeros((1, 5)), np.array(
[[im_height, 0.0, im_width / 2.0], [0.0, im_height, im_height / 2.0], [0.0, 0.0, 1.0]])
subjects = landmark_estimator.get_subjects_from_faceboxes(color_img, faceboxes)
for subject in subjects:
print('Subject: ',subject)
print('landmark_estimator.eye_image_size: ',landmark_estimator.eye_image_size)
le_c, re_c, le_p, re_p = subject.get_eye_image_from_landmarks(subject, landmark_estimator.eye_image_size)
if le_c is None :
continue
l_img_input, r_img_input = blink_estimator.inputs_from_images(le_c, re_c)
print("Left Eye position: ",le_p)
print("Right Eye position: ",re_p)
start_time = time.time()
probs = blink_estimator.predict([l_img_input], [r_img_input])
blinks = probs >= blink_estimator.threshold
pair_img = np.concatenate((re_c, le_c), axis=1)
viz_img = blink_estimator.overlay_prediction_over_img(pair_img, blinks)
cv2.imshow('folder images visualisation', viz_img)
cv2.waitKey(1)
`
How can I track gaze estimation that is where person is looking either left,right,center or back ? which variable do I need to monitor for this ?
You can look at the head pose https://github.com/Tobias-Fischer/rt_gene/blob/aef31be7031f2f93cdd71e603d73375c0fcd4887/rt_gene_standalone/estimate_gaze_standalone.py#L80 and eye gaze https://github.com/Tobias-Fischer/rt_gene/blob/aef31be7031f2f93cdd71e603d73375c0fcd4887/rt_gene_standalone/estimate_gaze_standalone.py#L104
Thanks @Tobias-Fischer I managed to find the direction based on head pose and eye gaze values.
When I test gaze estimation and blink prediction on real time video the processing speed is very slow , its like 1FPS most of the time is spent in extracting features using opencv library . Is there a way to improve the processing speed with more FPS ?
It should run in real time, the ROS version certainly does. What exactly is slow (line in code)?
code delays in this line landmark_estimator.get_face_bb(color_img). it takes around 0.3 seconds
It could be that the face detector is not running on GPU? I think the device name is printed, could you check?
Yea I checked device is cuda
this is my modified code to do gaze estimation and blink detection on real time video
#!/usr/bin/env python
# Licensed under Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International (https://creativecommons.org/licenses/by-nc-sa/4.0/legalcode)
from __future__ import print_function, division, absolute_import
import argparse
import os
import sys
import cv2
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import time
sys.path.insert(0,r'C:\research\gaze\rt_gene\rt_gene\src')
from rt_gene.extract_landmarks_method_base import LandmarkMethodBase
from rt_gene.gaze_tools import get_phi_theta_from_euler, limit_yaw
from rt_gene.gaze_tools_standalone import euler_from_matrix
from rt_bene.estimate_blink_pytorch import BlinkEstimatorPytorch
script_path = os.path.dirname(os.path.realpath(__file__))
print("SCRIPPt:",script_path)
blink_estimator = BlinkEstimatorPytorch(device_id_blink="cuda", threshold=0.1, model_files=[r'C:\research\gaze\rt_gene\rt_gene\model_nets\blink_model_pytorch_vgg16_allsubjects1.model'], model_type="vgg16")
def load_camera_calibration(calibration_file):
import yaml
with open(calibration_file, 'r') as f:
cal = yaml.safe_load(f)
dist_coefficients = np.array(cal['distortion_coefficients']['data'], dtype='float32').reshape(1, 5)
camera_matrix = np.array(cal['camera_matrix']['data'], dtype='float32').reshape(3, 3)
return dist_coefficients, camera_matrix
def extract_eye_image_patches(subjects):
print('Subjects: ',subjects)
for subject in subjects:
print('Subject: ',subject)
print('landmark_estimator.eye_image_size: ',landmark_estimator.eye_image_size)
le_c, re_c, _, _ = subject.get_eye_image_from_landmarks(subject, landmark_estimator.eye_image_size)
print('le_c: ',le_c,'re_c: ',re_c,'_ _: ',_)
subject.left_eye_color = le_c
subject.right_eye_color = re_c
def estimate_gaze(base_name, color_img, dist_coefficients, camera_matrix,count_frames):
fil = open(r'C:\research\gaze\rt_gene\output\%d.txt'%count_frames,'w')
print("here1")
stime = time.time()
faceboxes = landmark_estimator.get_face_bb(color_img)
print('timetaken1:',(time.time()-stime)%60)
if len(faceboxes) == 0:
tqdm.write('Could not find faces in the image')
return
stime = time.time()
subjects = landmark_estimator.get_subjects_from_faceboxes(color_img, faceboxes)
print('timetaken2:',time.time()-stime)
stime = time.time()
extract_eye_image_patches(subjects)
print('timetaken3:',time.time()-stime)
print("here2")
input_r_list = []
input_l_list = []
input_head_list = []
valid_subject_list = []
for idx, subject in enumerate(subjects):
if subject.left_eye_color is None or subject.right_eye_color is None:
tqdm.write('Failed to extract eye image patches')
continue
l_img_input, r_img_input = blink_estimator.inputs_from_images(subject.left_eye_color, subject.right_eye_color)
start_time = time.time()
probs = blink_estimator.predict([l_img_input], [r_img_input])
print('timetaken4:',time.time()-start_time)
start_time = time.time()
blinks = probs >= blink_estimator.threshold
pair_img = np.concatenate((subject.right_eye_color, subject.left_eye_color), axis=1)
viz_img = blink_estimator.overlay_prediction_over_img(pair_img, blinks)
# cv2.imshow('folder images visualisation', viz_img)
cv2.waitKey(1)
success, rotation_vector, _ = cv2.solvePnP(landmark_estimator.model_points,
subject.landmarks.reshape(len(subject.landmarks), 1, 2),
cameraMatrix=camera_matrix,
distCoeffs=dist_coefficients, flags=cv2.SOLVEPNP_DLS)
if not success:
tqdm.write('Not able to extract head pose for subject {}'.format(idx))
continue
_rotation_matrix, _ = cv2.Rodrigues(rotation_vector)
_rotation_matrix = np.matmul(_rotation_matrix, np.array([[0, 1, 0], [0, 0, -1], [-1, 0, 0]]))
_m = np.zeros((4, 4))
_m[:3, :3] = _rotation_matrix
_m[3, 3] = 1
# Go from camera space to ROS space
_camera_to_ros = [[0.0, 0.0, 1.0, 0.0],
[-1.0, 0.0, 0.0, 0.0],
[0.0, -1.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 1.0]]
roll_pitch_yaw = list(euler_from_matrix(np.dot(_camera_to_ros, _m)))
roll_pitch_yaw = limit_yaw(roll_pitch_yaw)
print("roll_pitch_yaw: ",roll_pitch_yaw)
fil.write(str(roll_pitch_yaw))
fil.write("\n")
phi_head, theta_head = get_phi_theta_from_euler(roll_pitch_yaw)
face_image_resized = cv2.resize(subject.face_color, dsize=(224, 224), interpolation=cv2.INTER_CUBIC)
head_pose_image = landmark_estimator.visualize_headpose_result(face_image_resized, (phi_head, theta_head))
if args.vis_headpose:
plt.axis("off")
plt.imshow(cv2.cvtColor(head_pose_image, cv2.COLOR_BGR2RGB))
plt.show()
if args.save_headpose:
# add idx to cope with multiple persons in one image
cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_headpose_%s.jpg'%(idx)), head_pose_image)
input_r_list.append(gaze_estimator.input_from_image(subject.right_eye_color))
input_l_list.append(gaze_estimator.input_from_image(subject.left_eye_color))
input_head_list.append([theta_head, phi_head])
valid_subject_list.append(idx)
print('timetaken5:',time.time()-start_time)
if len(valid_subject_list) == 0:
return
gaze_est = gaze_estimator.estimate_gaze_twoeyes(inference_input_left_list=input_l_list,
inference_input_right_list=input_r_list,
inference_headpose_list=input_head_list)
print("gaze_est: ",gaze_est)
fil.write(str(gaze_est))
fil.close()
for subject_id, gaze, headpose in zip(valid_subject_list, gaze_est.tolist(), input_head_list):
subject = subjects[subject_id]
# Build visualizations
r_gaze_img = gaze_estimator.visualize_eye_result(subject.right_eye_color, gaze)
l_gaze_img = gaze_estimator.visualize_eye_result(subject.left_eye_color, gaze)
s_gaze_img = np.concatenate((r_gaze_img, l_gaze_img), axis=1)
# cv2.imshow('Frame',cv2.cvtColor(s_gaze_img, cv2.COLOR_BGR2RGB))
color_img[10:50,10:100] = cv2.resize(cv2.cvtColor(s_gaze_img, cv2.COLOR_BGR2RGB),(90,40))
color_img[60:100,10:100] = cv2.resize(viz_img,(90,40))
cv2.imshow("Frame_output",color_img)
cv2.imwrite(r"C:\research\gaze\rt_gene\output\%d.jpg"%count_frames,color_img)
# cv2.imshow('head_pose',cv2.cvtColor(head_pose_image, cv2.COLOR_BGR2RGB))
# if args.vis_gaze:
# plt.axis("off")
# plt.imshow(cv2.cvtColor(s_gaze_img, cv2.COLOR_BGR2RGB))
# plt.show()
if args.save_gaze:
# add subject_id to cope with multiple persons in one image
cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_gaze_%s.jpg'%(subject_id)), s_gaze_img)
# cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_left.jpg'), subject.left_eye_color)
# cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_right.jpg'), subject.right_eye_color)
if args.save_estimate:
# add subject_id to cope with multiple persons in one image
with open(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_output_%s.txt'%(subject_id)), 'w+') as f:
f.write(os.path.splitext(base_name)[0] + ', [' + str(headpose[1]) + ', ' + str(headpose[0]) + ']' +
', [' + str(gaze[1]) + ', ' + str(gaze[0]) + ']' + '\n')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Estimate gaze from images')
parser.add_argument('im_path', type=str, default=os.path.abspath(os.path.join(script_path, './samples_gaze/')),
nargs='?', help='Path to an image or a directory containing images')
parser.add_argument('--calib-file', type=str, dest='calib_file', default=None, help='Camera calibration file')
parser.add_argument('--vis-headpose', dest='vis_headpose', action='store_true', help='Display the head pose images')
parser.add_argument('--no-vis-headpose', dest='vis_headpose', action='store_false', help='Do not display the head pose images')
parser.add_argument('--save-headpose', dest='save_headpose', action='store_true', help='Save the head pose images')
parser.add_argument('--no-save-headpose', dest='save_headpose', action='store_false', help='Do not save the head pose images')
parser.add_argument('--vis-gaze', dest='vis_gaze', action='store_true', help='Display the gaze images')
parser.add_argument('--no-vis-gaze', dest='vis_gaze', action='store_false', help='Do not display the gaze images')
parser.add_argument('--save-gaze', dest='save_gaze', action='store_true', help='Save the gaze images')
parser.add_argument('--save-estimate', dest='save_estimate', action='store_true', help='Save the predictions in a text file')
parser.add_argument('--no-save-gaze', dest='save_gaze', action='store_false', help='Do not save the gaze images')
parser.add_argument('--gaze_backend', choices=['tensorflow', 'pytorch'], default='tensorflow')
parser.add_argument('--output_path', type=str, default=os.path.abspath(os.path.join(script_path, './samples_gaze/out')),
help='Output directory for head pose and gaze images')
parser.add_argument('--models', nargs='+', type=str, default=[os.path.abspath(os.path.join(script_path, '../rt_gene/model_nets/Model_allsubjects1.h5'))],
help='List of gaze estimators')
parser.add_argument('--device-id-facedetection', dest="device_id_facedetection", type=str, default='cuda:0', help='Pytorch device id. Set to "cpu:0" to disable cuda')
parser.set_defaults(vis_gaze=True)
parser.set_defaults(save_gaze=True)
parser.set_defaults(vis_headpose=False)
parser.set_defaults(save_headpose=True)
parser.set_defaults(save_estimate=False)
args = parser.parse_args()
image_path_list = []
if os.path.isfile(args.im_path):
image_path_list.append(os.path.split(args.im_path)[1])
args.im_path = os.path.split(args.im_path)[0]
elif os.path.isdir(args.im_path):
for image_file_name in sorted(os.listdir(args.im_path)):
if image_file_name.lower().endswith('.jpg') or image_file_name.lower().endswith('.png') or image_file_name.lower().endswith('.jpeg'):
if '_gaze' not in image_file_name and '_headpose' not in image_file_name:
image_path_list.append(image_file_name)
else:
tqdm.write('Provide either a path to an image or a path to a directory containing images')
sys.exit(1)
tqdm.write('Loading networks')
landmark_estimator = LandmarkMethodBase(device_id_facedetection=args.device_id_facedetection,
checkpoint_path_face=os.path.abspath(os.path.join(script_path, "../rt_gene/model_nets/SFD/s3fd_facedetector.pth")),
checkpoint_path_landmark=os.path.abspath(
os.path.join(script_path, "../rt_gene/model_nets/phase1_wpdc_vdc.pth.tar")),
model_points_file=os.path.abspath(os.path.join(script_path, "../rt_gene/model_nets/face_model_68.txt")))
if args.gaze_backend == "tensorflow":
from rt_gene.estimate_gaze_tensorflow import GazeEstimator
gaze_estimator = GazeEstimator("/gpu:0", args.models)
elif args.gaze_backend == "pytorch":
from rt_gene.estimate_gaze_pytorch import GazeEstimator
gaze_estimator = GazeEstimator("cuda:0", args.models)
else:
raise ValueError("Incorrect gaze_base backend, choices are: tensorflow or pytorch")
if not os.path.isdir(args.output_path):
os.makedirs(args.output_path)
# for image_file_name in tqdm(image_path_list):
# tqdm.write('Estimate gaze on ' + image_file_name)
# image = cv2.imread(os.path.join(args.im_path, image_file_name))
# if image is None:
# tqdm.write('Could not load ' + image_file_name + ', skipping this image.')
# continue
# if args.calib_file is not None:
# _dist_coefficients, _camera_matrix = load_camera_calibration(args.calib_file)
# else:
# im_width, im_height = image.shape[1], image.shape[0]
# tqdm.write('WARNING!!! You should provide the camera calibration file, otherwise you might get bad results. Using a crude approximation!')
# _dist_coefficients, _camera_matrix = np.zeros((1, 5)), np.array(
# [[im_height, 0.0, im_width / 2.0], [0.0, im_height, im_height / 2.0], [0.0, 0.0, 1.0]])
# print('Image file name : ',image_file_name)
# estimate_gaze(image_file_name, image, _dist_coefficients, _camera_matrix)
count_frames = 0
cap = cv2.VideoCapture(r'C:\research\DMS\Joes\driver_sleep.mp4')
print("Video reading started!!!!!!!!")
if not cap.isOpened():
print("Cannot open camera")
exit()
while True:
stime = time.time()
count_frames+=1
# Capture frame-by-frame
ret, frame = cap.read()
print('time taken frame',time.time()-stime)
# if frame is read correctly ret is True
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
break
# Our operations on the frame come here
im_width, im_height = frame.shape[1], frame.shape[0]
tqdm.write('WARNING!!! You should provide the camera calibration file, otherwise you might get bad results. Using a crude approximation!')
_dist_coefficients, _camera_matrix = np.zeros((1, 5)), np.array(
[[im_height, 0.0, im_width / 2.0], [0.0, im_height, im_height / 2.0], [0.0, 0.0, 1.0]])
stime = time.time()
estimate_gaze("testing on webcam", frame, _dist_coefficients, _camera_matrix,count_frames)
print('time taken 0',time.time()-stime)
# if cv2.waitKey(1) == ord('q'):
# break
cap.release()
cv2.destroyAllWindows()
can you check from your end by running this code with video to figureout what exactly hindering the speed?
Apologies but I don't have the time to go through this. Could you try the ROS code which we know runs in real time?
ok. I'll try using that
Hey thanks for making this project . I tested rt_gene and bt_gene standalone in my windows system it works perfectly for the examples in this repo. I just wanted to check the model performance on another video or live webcam . is it implemented in this standalone code?or can you guide me on how to do that?