diff --git a/src/face_recognition_system_3.py b/src/face_recognition_system_3.py index a9a7144..6475080 100644 --- a/src/face_recognition_system_3.py +++ b/src/face_recognition_system_3.py @@ -33,7 +33,7 @@ class SingleFaceComparisonSystem: self.target_image_path = None # 相似度阈值 - self.similarity_threshold = 0.25 + self.similarity_threshold = 0.3 # 颜色配置 self.colors = { @@ -141,8 +141,8 @@ class SingleFaceComparisonSystem: """ if similarity >= self.similarity_threshold: return self.colors['match_high'] # 高相似度 - 绿色 - elif similarity >= self.similarity_threshold/2: - return self.colors['match_low'] # 低相似度 - 橙色 + # elif similarity >= self.similarity_threshold/2: + # return self.colors['match_low'] # 低相似度 - 橙色 else: return self.colors['no_match'] # 不匹配 - 红色 @@ -383,8 +383,9 @@ def demo_usage(): os.makedirs("test_data/query", exist_ok=True) os.makedirs("test_data/output", exist_ok=True) - target_image = "test_data/register/person2.jpg" + # target_image = "test_data/register/person2.jpg" # target_image = "test_data/register/person1.png" + target_image = "test_data/register/ztk.jpg" # 设置目标人脸 @@ -431,8 +432,8 @@ def demo_usage(): print("\n=== 批量比对图像 ===") if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0: stats = face_system.batch_process_with_target( - input_dir="test_data/query", - output_dir="test_data/output/batch_comparison" + input_dir="test_data/query/2", + output_dir="test_data/output/batch_comparison/2" ) print(f"批量比对统计:") @@ -447,7 +448,7 @@ def demo_usage(): # 5. 调整相似度阈值 print("\n=== 调整阈值 ===") - face_system.set_similarity_threshold(0.25) # 提高阈值,更严格 + face_system.set_similarity_threshold(0.3) # 提高阈值,更严格 # 高级使用示例 diff --git a/src/video_face_recognition.py b/src/video_face_recognition.py index 61729f7..95b5be7 100644 --- a/src/video_face_recognition.py +++ b/src/video_face_recognition.py @@ -294,8 +294,8 @@ def main(): video_system = VideoFaceRecognition(use_gpu=True) # 设置目标人脸(可选) - target_image = "test_data/register/person1.png" - # target_image = "test_data/register/cjh.jpg" + # target_image = "test_data/register/person1.png" + target_image = "test_data/register/sy.jpg" if os.path.exists(target_image): video_system.set_target_face(target_image, "目标人物") @@ -310,8 +310,8 @@ def main(): if choice == "1": # 处理视频文件 - video_path = "test_data/video/test_video.mp4" - output_path = "test_data/output_video/zqc.mp4" + video_path = "test_data/video/video_1.mp4" + output_path = "test_data/output_video/video_1.mp4" # 性能优化:跳帧处理 skip_frames = 1 # 每2帧处理1帧,提高速度 diff --git a/src/video_face_recognition_2.py b/src/video_face_recognition_2.py new file mode 100644 index 0000000..dc7a04e --- /dev/null +++ b/src/video_face_recognition_2.py @@ -0,0 +1,466 @@ +# video_face_recognition.py +import cv2 +import numpy as np +import time +from insightface.app import FaceAnalysis +from typing import List, Dict, Tuple +import os + + +class VideoFaceRecognition: + """ + 视频人脸识别系统 + 支持实时视频流和视频文件处理 + """ + + def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): + # 初始化人脸识别模型 + self.app = FaceAnalysis(name=model_name) + self.app.prepare( + ctx_id=0 if use_gpu else -1, + det_thresh=0.3, + det_size=(640, 640) + ) + + self.target_embedding = None + self.target_id = None + self.similarity_threshold = 0.3 + + # 性能统计 + self.frame_count = 0 + self.processing_times = [] + + print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") + + def set_target_face(self, image_path: str, person_id: str = "target") -> bool: + """设置目标人脸""" + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取目标图像: {image_path}") + return False + + faces = self.app.get(img) + if not faces: + print(f"❌ 目标图像中未检测到人脸: {image_path}") + return False + + self.target_embedding = faces[0].embedding + self.target_id = person_id + print(f"✅ 目标人脸设置: {person_id}") + return True + + def calculate_face_quality(self, face) -> Dict: + """ + 计算人脸质量指标 + """ + quality_metrics = {} + + # 1. 检测置信度 + quality_metrics['det_score'] = float(face.det_score) + + # 2. 人脸姿态角度 (pitch, yaw, roll) + if hasattr(face, 'pose') and face.pose is not None: + pitch, yaw, roll = face.pose + quality_metrics['pitch'] = float(pitch) # 俯仰角 + quality_metrics['yaw'] = float(yaw) # 偏航角 + quality_metrics['roll'] = float(roll) # 翻滚角 + else: + quality_metrics['pitch'] = 0.0 + quality_metrics['yaw'] = 0.0 + quality_metrics['roll'] = 0.0 + + # 3. 人脸边界框信息 + bbox = face.bbox + width = bbox[2] - bbox[0] + height = bbox[3] - bbox[1] + quality_metrics['bbox_area'] = width * height + quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 + + # 4. 关键点质量评估 (基于关键点分布) + if hasattr(face, 'kps') and face.kps is not None: + kps = face.kps + # 计算关键点分布的均匀性 + if len(kps) >= 5: + # 计算关键点之间的平均距离 + distances = [] + for i in range(len(kps)): + for j in range(i + 1, len(kps)): + dist = np.linalg.norm(kps[i] - kps[j]) + distances.append(dist) + if distances: + quality_metrics['kps_variance'] = float(np.var(distances)) + else: + quality_metrics['kps_variance'] = 0.0 + else: + quality_metrics['kps_variance'] = 0.0 + else: + quality_metrics['kps_variance'] = 0.0 + + # 5. 综合质量评分 + # 基于检测得分、姿态角度、边界框大小等因素 + base_score = quality_metrics['det_score'] + + # 姿态惩罚 - 角度越大质量分越低 + pose_penalty = 0.0 + if abs(quality_metrics['yaw']) > 30: # 偏航角大于30度惩罚 + pose_penalty += 0.2 + if abs(quality_metrics['pitch']) > 20: # 俯仰角大于20度惩罚 + pose_penalty += 0.2 + + quality_metrics['quality_score'] = max(0.1, base_score - pose_penalty) + + return quality_metrics + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: + """ + 处理单帧图像 + 返回: (处理后的帧, 识别结果列表) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + similarity = 0.0 + if self.target_embedding is not None: + # 计算相似度 + emb1 = face.embedding / np.linalg.norm(face.embedding) + emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) + similarity = float(np.dot(emb1, emb2)) + + # 计算人脸质量指标 + quality_metrics = self.calculate_face_quality(face) + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'is_match': similarity >= self.similarity_threshold, + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics # 添加质量指标 + } + results.append(result) + + # 在帧上绘制结果 + frame = self._draw_detection(frame, result) + + # 性能统计 + processing_time = (time.time() - start_time) * 1000 + self.processing_times.append(processing_time) + self.frame_count += 1 + + return frame, results + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果和质量信息""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + quality_metrics = result['quality_metrics'] + + # 选择颜色 + if is_match: + color = (0, 255, 0) # 绿色 - 匹配 + else: + color = (0, 0, 255) # 红色 - 不匹配 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 准备显示文本 + text_lines = [] + + # 第一行:匹配状态和相似度 + if self.target_id: + status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" + text_lines.append(status) + else: + text_lines.append(f"Similarity: {similarity:.3f}") + + # 第二行:基础信息 + text_lines.append(f"{result['gender']}/{result['age']}") + + # 第三行:质量得分和检测得分 + text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") + text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") + + # 第四行:姿态角度 + text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") + text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") + text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°") + + # 第五行:其他质量指标 + text_lines.append(f"Area: {quality_metrics['bbox_area']:.0f}") + text_lines.append(f"Aspect: {quality_metrics['aspect_ratio']:.2f}") + + # 计算文本区域大小 + max_text_width = 0 + total_text_height = 0 + line_heights = [] + + for line in text_lines: + (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) + max_text_width = max(max_text_width, text_width) + line_heights.append(text_height + baseline) + total_text_height += text_height + baseline + 2 # 2像素行间距 + + # 绘制文本背景 + bg_x1 = x1 + bg_y1 = y1 - total_text_height - 10 + bg_x2 = x1 + max_text_width + 10 + bg_y2 = y1 + + # 如果背景超出图像顶部,调整到框下方 + if bg_y1 < 0: + bg_y1 = y2 + bg_y2 = y2 + total_text_height + 10 + + # 绘制半透明背景 + overlay = frame.copy() + cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) + alpha = 0.6 # 透明度 + cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) + + # 绘制文本 + current_y = bg_y1 + 15 + for i, line in enumerate(text_lines): + # 根据内容选择颜色 + if i == 0: # 匹配状态行 + text_color = (0, 255, 0) if is_match else (0, 0, 255) + elif i in [2, 3]: # 质量得分行 + # 根据质量得分调整颜色 + quality = quality_metrics['quality_score'] + if quality > 0.7: + text_color = (0, 255, 0) # 绿色 - 高质量 + elif quality > 0.4: + text_color = (0, 255, 255) # 黄色 - 中等质量 + else: + text_color = (0, 0, 255) # 红色 - 低质量 + elif i in [4, 5, 6]: # 姿态角度行 + # 根据角度大小调整颜色 + if abs(quality_metrics['yaw']) > 45 or abs(quality_metrics['pitch']) > 30: + text_color = (0, 0, 255) # 红色 - 角度过大 + elif abs(quality_metrics['yaw']) > 30 or abs(quality_metrics['pitch']) > 20: + text_color = (0, 255, 255) # 黄色 - 角度偏大 + else: + text_color = (0, 255, 0) # 绿色 - 角度良好 + else: + text_color = (255, 255, 255) # 白色 - 普通信息 + + cv2.putText(frame, line, (x1 + 5, current_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) + current_y += line_heights[i] + + return frame + + def process_video_file(self, video_path: str, output_path: str = None, + skip_frames: int = 0, show_preview: bool = True): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径 + skip_frames: 跳帧数,用于提高处理速度 + show_preview: 是否显示实时预览 + """ + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + return + + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") + + # 设置输出视频 + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height)) + else: + out = None + + # 处理视频帧 + frame_index = 0 + processed_frames = 0 + start_time = time.time() + + print("🚀 开始处理视频...") + + while True: + ret, frame = cap.read() + if not ret: + break + + # 跳帧处理 + if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: + frame_index += 1 + continue + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 写入输出视频 + if out: + out.write(processed_frame) + + # 显示预览 + if show_preview: + # 添加性能信息 + fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}" + cv2.putText(processed_frame, fps_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Video Face Recognition', processed_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_index += 1 + processed_frames += 1 + + # 进度显示 + if frame_index % 30 == 0: + progress = (frame_index / total_frames) * 100 + print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") + + # 清理资源 + cap.release() + if out: + out.release() + if show_preview: + cv2.destroyAllWindows() + + # 性能统计 + total_time = time.time() - start_time + avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 + + print(f"\n🎉 视频处理完成!") + print(f"📊 性能统计:") + print(f" 总处理帧数: {processed_frames}") + print(f" 总耗时: {total_time:.1f}秒") + print(f" 平均每帧: {avg_processing_time:.1f}ms") + print(f" 实际FPS: {processed_frames / total_time:.1f}") + if output_path: + print(f" 输出视频: {output_path}") + + def process_webcam(self, camera_id: int = 0, output_path: str = None): + """ + 处理摄像头实时视频流 + """ + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + print(f"❌ 无法打开摄像头 {camera_id}") + return + + # 设置摄像头分辨率(可选) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + # 设置输出视频 + if output_path: + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + else: + out = None + + print("🎥 开始摄像头实时识别 (按 'q' 退出)...") + + while True: + ret, frame = cap.read() + if not ret: + print("❌ 无法读取摄像头帧") + break + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 添加实时信息 + current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 + info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}" + cv2.putText(processed_frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 写入输出 + if out: + out.write(processed_frame) + + # 显示预览 + cv2.imshow('Real-time Face Recognition', processed_frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 清理资源 + cap.release() + if out: + out.release() + cv2.destroyAllWindows() + + print("✅ 摄像头处理结束") + + +# 使用示例 +def main(): + # 创建视频识别系统 + video_system = VideoFaceRecognition(use_gpu=True) + + # 设置目标人脸(可选) + target_image = "test_data/register/sy.jpg" + + if os.path.exists(target_image): + video_system.set_target_face(target_image, "目标人物") + + # 选择处理模式 + print("请选择处理模式:") + print("1. 处理视频文件") + print("2. 实时摄像头") + + choice = input("请输入选择 (1 或 2): ").strip() + + if choice == "1": + # 处理视频文件 + video_path = "test_data/video/video_1.mp4" + output_path = "test_data/output_video/video_1_quality.mp4" + + # 性能优化:跳帧处理 + skip_frames = 1 # 每2帧处理1帧,提高速度 + + video_system.process_video_file( + video_path=video_path, + output_path=output_path, + skip_frames=skip_frames, + show_preview=True + ) + + elif choice == "2": + # 实时摄像头 + output_path = "webcam_recording.mp4" # 可选:保存录制 + + video_system.process_webcam( + camera_id=0, + output_path=output_path + ) + + else: + print("❌ 无效选择") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/video_face_recognition_3.py b/src/video_face_recognition_3.py new file mode 100644 index 0000000..b949c71 --- /dev/null +++ b/src/video_face_recognition_3.py @@ -0,0 +1,541 @@ +# video_face_recognition.py +import cv2 +import numpy as np +import time +from insightface.app import FaceAnalysis +from typing import List, Dict, Tuple +import os + + +class VideoFaceRecognition: + """ + 视频人脸识别系统 + 支持实时视频流和视频文件处理 + """ + + def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): + # 初始化人脸识别模型 + self.app = FaceAnalysis(name=model_name) + self.app.prepare( + ctx_id=0 if use_gpu else -1, + det_thresh=0.2, + det_size=(640, 640) + ) + + self.target_embedding = None + self.target_id = None + self.similarity_threshold = 0.3 + + # 质量阈值设置 + self.clarity_threshold = 50.0 # 清晰度阈值,低于此值认为人脸模糊 + self.min_face_size = 40 # 最小人脸像素尺寸 + + # 性能统计 + self.frame_count = 0 + self.processing_times = [] + + print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") + + def set_target_face(self, image_path: str, person_id: str = "target") -> bool: + """设置目标人脸""" + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取目标图像: {image_path}") + return False + + faces = self.app.get(img) + if not faces: + print(f"❌ 目标图像中未检测到人脸: {image_path}") + return False + + self.target_embedding = faces[0].embedding + self.target_id = person_id + print(f"✅ 目标人脸设置: {person_id}") + return True + + def calculate_clarity(self, face_region: np.ndarray) -> float: + """ + 计算人脸区域的清晰度/模糊度 + 使用拉普拉斯方差方法:值越高表示图像越清晰 + """ + if len(face_region.shape) == 3: + gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY) + else: + gray = face_region + + # 计算拉普拉斯算子的方差 + laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() + return laplacian_var + + def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]: + """ + 综合判断人脸质量是否可接受 + 返回: (是否可接受, 质量指标字典) + """ + quality_metrics = {} + + # 1. 检测置信度 + quality_metrics['det_score'] = float(face.det_score) + + # 2. 人脸姿态角度 + if hasattr(face, 'pose') and face.pose is not None: + pitch, yaw, roll = face.pose + quality_metrics['pitch'] = float(pitch) + quality_metrics['yaw'] = float(yaw) + quality_metrics['roll'] = float(roll) + else: + quality_metrics['pitch'] = 0.0 + quality_metrics['yaw'] = 0.0 + quality_metrics['roll'] = 0.0 + + # 3. 人脸边界框信息 + bbox = face.bbox + x1, y1, x2, y2 = bbox.astype(int) + width = x2 - x1 + height = y2 - y1 + quality_metrics['bbox_area'] = width * height + quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 + + # 4. 图像清晰度检测 + # 提取人脸区域 + h, w = frame.shape[:2] + x1_clip = max(0, x1) + y1_clip = max(0, y1) + x2_clip = min(w, x2) + y2_clip = min(h, y2) + + if x2_clip > x1_clip and y2_clip > y1_clip: + face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip] + clarity_score = self.calculate_clarity(face_region) + quality_metrics['clarity_score'] = clarity_score + else: + quality_metrics['clarity_score'] = 0.0 + + # 5. 关键点质量评估 + if hasattr(face, 'kps') and face.kps is not None: + kps = face.kps + if len(kps) >= 5: + distances = [] + for i in range(len(kps)): + for j in range(i + 1, len(kps)): + dist = np.linalg.norm(kps[i] - kps[j]) + distances.append(dist) + if distances: + quality_metrics['kps_variance'] = float(np.var(distances)) + else: + quality_metrics['kps_variance'] = 0.0 + else: + quality_metrics['kps_variance'] = 0.0 + else: + quality_metrics['kps_variance'] = 0.0 + + # 6. 综合质量评分 + base_score = quality_metrics['det_score'] + + # 清晰度惩罚 + clarity_penalty = 0.0 + if quality_metrics['clarity_score'] < self.clarity_threshold: + clarity_penalty = 0.3 # 清晰度不足严重惩罚 + + # 姿态惩罚 + pose_penalty = 0.0 + if abs(quality_metrics['yaw']) > 30: + pose_penalty += 0.2 + if abs(quality_metrics['pitch']) > 20: + pose_penalty += 0.2 + + # 尺寸惩罚 + size_penalty = 0.0 + if quality_metrics['bbox_area'] < (self.min_face_size ** 2): + size_penalty = 0.2 + + quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty) + + # 判断是否可接受 + is_acceptable = ( + quality_metrics['det_score'] > 0.5 and # 基础检测置信度 + quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求 + quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求 + abs(quality_metrics['yaw']) < 60 and # 偏航角限制 + abs(quality_metrics['pitch']) < 45 # 俯仰角限制 + ) + + return is_acceptable, quality_metrics + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: + """ + 处理单帧图像 + 返回: (处理后的帧, 识别结果列表) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + # 检查人脸质量是否可接受 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) + + similarity = 0.0 + # 只有在人脸质量可接受时才计算相似度 + if is_acceptable and self.target_embedding is not None: + emb1 = face.embedding / np.linalg.norm(face.embedding) + emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) + similarity = float(np.dot(emb1, emb2)) + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'is_match': similarity >= self.similarity_threshold, + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics, + 'is_acceptable': is_acceptable # 新增:是否可接受标志 + } + results.append(result) + + # 在帧上绘制结果 + frame = self._draw_detection(frame, result) + + # 性能统计 + processing_time = (time.time() - start_time) * 1000 + self.processing_times.append(processing_time) + self.frame_count += 1 + + return frame, results + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果和质量信息""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + is_acceptable = result['is_acceptable'] + quality_metrics = result['quality_metrics'] + + # 选择颜色 + if not is_acceptable: + color = (128, 128, 128) # 灰色 - 质量不可接受 + elif is_match: + color = (0, 255, 0) # 绿色 - 匹配 + else: + color = (0, 0, 255) # 红色 - 不匹配 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 准备显示文本 + text_lines = [] + + # 第一行:质量状态和匹配状态 + if not is_acceptable: + text_lines.append("LOW QUALITY") + elif self.target_id: + status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" + text_lines.append(status) + else: + text_lines.append(f"Similarity: {similarity:.3f}") + + # 第二行:基础信息 + text_lines.append(f"{result['gender']}/{result['age']}") + + # 第三行:质量得分 + text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") + text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") + + # 第四行:清晰度(关键指标) + clarity_status = "CLEAR" if quality_metrics['clarity_score'] >= self.clarity_threshold else "BLUR" + text_lines.append(f"Clarity: {clarity_status} ({quality_metrics['clarity_score']:.1f})") + + # 第五行:姿态角度 + text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") + text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") + text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°") + + # 第六行:其他质量指标 + text_lines.append(f"Area: {quality_metrics['bbox_area']:.0f}") + + # 计算文本区域大小 + max_text_width = 0 + total_text_height = 0 + line_heights = [] + + for line in text_lines: + (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) + max_text_width = max(max_text_width, text_width) + line_heights.append(text_height + baseline) + total_text_height += text_height + baseline + 2 + + # 绘制文本背景 + bg_x1 = x1 + bg_y1 = y1 - total_text_height - 10 + bg_x2 = x1 + max_text_width + 10 + bg_y2 = y1 + + # 如果背景超出图像顶部,调整到框下方 + if bg_y1 < 0: + bg_y1 = y2 + bg_y2 = y2 + total_text_height + 10 + + # 绘制半透明背景 + overlay = frame.copy() + cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) + alpha = 0.6 + cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) + + # 绘制文本 + current_y = bg_y1 + 15 + for i, line in enumerate(text_lines): + # 根据内容选择颜色 + if i == 0: # 状态行 + if not is_acceptable: + text_color = (128, 128, 128) # 灰色 - 质量差 + elif is_match: + text_color = (0, 255, 0) # 绿色 - 匹配 + else: + text_color = (0, 0, 255) # 红色 - 不匹配 + elif i == 3: # 清晰度行 + if quality_metrics['clarity_score'] >= self.clarity_threshold: + text_color = (0, 255, 0) # 绿色 - 清晰 + else: + text_color = (0, 0, 255) # 红色 - 模糊 + elif i in [4, 5, 6]: # 姿态角度行 + if abs(quality_metrics['yaw']) > 45 or abs(quality_metrics['pitch']) > 30: + text_color = (0, 0, 255) # 红色 - 角度过大 + elif abs(quality_metrics['yaw']) > 30 or abs(quality_metrics['pitch']) > 20: + text_color = (0, 255, 255) # 黄色 - 角度偏大 + else: + text_color = (0, 255, 0) # 绿色 - 角度良好 + else: + text_color = (255, 255, 255) # 白色 + + cv2.putText(frame, line, (x1 + 5, current_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) + current_y += line_heights[i] + + return frame + + def set_quality_thresholds(self, clarity_threshold: float = None, min_face_size: int = None): + """设置质量阈值""" + if clarity_threshold is not None: + self.clarity_threshold = clarity_threshold + if min_face_size is not None: + self.min_face_size = min_face_size + print(f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 最小尺寸: {self.min_face_size}") + + # 原有的 process_video_file 和 process_webcam 方法保持不变 + def process_video_file(self, video_path: str, output_path: str = None, + skip_frames: int = 0, show_preview: bool = True): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径 + skip_frames: 跳帧数,用于提高处理速度 + show_preview: 是否显示实时预览 + """ + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + return + + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") + + # 设置输出视频 + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height)) + else: + out = None + + # 处理视频帧 + frame_index = 0 + processed_frames = 0 + start_time = time.time() + + print("🚀 开始处理视频...") + + while True: + ret, frame = cap.read() + if not ret: + break + + # 跳帧处理 + if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: + frame_index += 1 + continue + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 写入输出视频 + if out: + out.write(processed_frame) + + # 显示预览 + if show_preview: + # 添加性能信息 + fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}" + cv2.putText(processed_frame, fps_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Video Face Recognition', processed_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_index += 1 + processed_frames += 1 + + # 进度显示 + if frame_index % 30 == 0: + progress = (frame_index / total_frames) * 100 + print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") + + # 清理资源 + cap.release() + if out: + out.release() + if show_preview: + cv2.destroyAllWindows() + + # 性能统计 + total_time = time.time() - start_time + avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 + + print(f"\n🎉 视频处理完成!") + print(f"📊 性能统计:") + print(f" 总处理帧数: {processed_frames}") + print(f" 总耗时: {total_time:.1f}秒") + print(f" 平均每帧: {avg_processing_time:.1f}ms") + print(f" 实际FPS: {processed_frames / total_time:.1f}") + if output_path: + print(f" 输出视频: {output_path}") + + def process_webcam(self, camera_id: int = 0, output_path: str = None): + """ + 处理摄像头实时视频流 + """ + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + print(f"❌ 无法打开摄像头 {camera_id}") + return + + # 设置摄像头分辨率(可选) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + # 设置输出视频 + if output_path: + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + else: + out = None + + print("🎥 开始摄像头实时识别 (按 'q' 退出)...") + + while True: + ret, frame = cap.read() + if not ret: + print("❌ 无法读取摄像头帧") + break + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 添加实时信息 + current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 + info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}" + cv2.putText(processed_frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 写入输出 + if out: + out.write(processed_frame) + + # 显示预览 + cv2.imshow('Real-time Face Recognition', processed_frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 清理资源 + cap.release() + if out: + out.release() + cv2.destroyAllWindows() + + print("✅ 摄像头处理结束") + + +# 使用示例 +def main(): + # 创建视频识别系统 + video_system = VideoFaceRecognition(use_gpu=True) + + # 设置质量阈值(可根据实际情况调整) + video_system.set_quality_thresholds( + clarity_threshold=50.0, # 清晰度阈值,可能需要根据你的视频调整 + min_face_size=40 + ) + + # 设置目标人脸(可选) + target_image = "test_data/register/sy.jpg" + + if os.path.exists(target_image): + video_system.set_target_face(target_image, "目标人物") + + # 选择处理模式 + print("请选择处理模式:") + print("1. 处理视频文件") + print("2. 实时摄像头") + + choice = input("请输入选择 (1 或 2): ").strip() + + if choice == "1": + # 处理视频文件 + video_path = "test_data/video/video_1.mp4" + output_path = "test_data/output_video/video_5_quality.mp4" + + # 性能优化:跳帧处理 + skip_frames = 1 + + video_system.process_video_file( + video_path=video_path, + output_path=output_path, + skip_frames=skip_frames, + show_preview=True + ) + + elif choice == "2": + # 实时摄像头 + output_path = "webcam_recording.mp4" + + video_system.process_webcam( + camera_id=0, + output_path=output_path + ) + + else: + print("❌ 无效选择") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/video_face_recognition_5.py b/src/video_face_recognition_5.py new file mode 100644 index 0000000..e134132 --- /dev/null +++ b/src/video_face_recognition_5.py @@ -0,0 +1,439 @@ +# video_face_recognition.py +import cv2 +import numpy as np +import time +from insightface.app import FaceAnalysis +from typing import List, Dict, Tuple, Optional +import os +import glob + +#黑白名单 + +class VideoFaceRecognition: + """ + 视频人脸识别系统 + 支持实时视频流和视频文件处理 + 支持黑名单和白名单模式 + """ + + def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): + # 初始化人脸识别模型 + self.app = FaceAnalysis(name=model_name) + self.app.prepare( + ctx_id=0 if use_gpu else -1, + det_thresh=0.39, + det_size=(640, 640) + ) + + # 名单相关变量 + self.list_mode = "blacklist" # "blacklist" 或 "whitelist" + self.registered_faces = {} # {name: embedding} + self.similarity_threshold = 0.3 + + # 性能统计 + self.frame_count = 0 + self.processing_times = [] + + print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") + + def set_list_mode(self, mode: str): + """设置名单模式""" + if mode.lower() in ["blacklist", "whitelist"]: + self.list_mode = mode.lower() + print(f"✅ 名单模式设置为: {self.list_mode}") + else: + print("❌ 无效的名单模式,请使用 'blacklist' 或 'whitelist'") + + def load_registered_faces(self, register_dir: str): + """ + 从目录加载注册的人脸图片 + 文件名(去掉后缀)即为人的名字 + """ + if not os.path.exists(register_dir): + print(f"❌ 注册目录不存在: {register_dir}") + return False + + # 支持的图片格式 + image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp'] + image_files = [] + + for ext in image_extensions: + image_files.extend(glob.glob(os.path.join(register_dir, ext))) + image_files.extend(glob.glob(os.path.join(register_dir, ext.upper()))) + + if not image_files: + print(f"❌ 在目录 {register_dir} 中未找到图片文件") + return False + + loaded_count = 0 + for image_path in image_files: + # 获取文件名(不含扩展名)作为人名 + person_name = os.path.splitext(os.path.basename(image_path))[0] + + # 读取图片并提取人脸特征 + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取图片: {image_path}") + continue + + faces = self.app.get(img) + if not faces: + print(f"❌ 图片中未检测到人脸: {image_path}") + continue + + # 使用第一张检测到的人脸 + self.registered_faces[person_name] = faces[0].embedding + loaded_count += 1 + print(f"✅ 加载注册人脸: {person_name}") + + print(f"🎉 成功加载 {loaded_count} 张注册人脸") + return loaded_count > 0 + + def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]: + """ + 在注册人脸中查找最佳匹配 + 返回: (匹配的人名, 相似度) + """ + if not self.registered_faces: + return None, 0.0 + + best_similarity = 0.0 + best_name = None + + # 归一化查询嵌入 + query_emb = embedding / np.linalg.norm(embedding) + + for name, registered_embedding in self.registered_faces.items(): + # 归一化注册嵌入 + reg_emb = registered_embedding / np.linalg.norm(registered_embedding) + + # 计算余弦相似度 + similarity = float(np.dot(query_emb, reg_emb)) + + if similarity > best_similarity: + best_similarity = similarity + best_name = name + + return best_name, best_similarity + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: + """ + 处理单帧图像 + 返回: (处理后的帧, 识别结果列表) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + # 查找最佳匹配 + best_name, similarity = self.find_best_match(face.embedding) + + # 根据名单模式判断是否匹配 + if self.list_mode == "blacklist": + # 黑名单模式:在黑名单中即为匹配(需要关注) + is_match = best_name is not None and similarity >= self.similarity_threshold + else: # whitelist + # 白名单模式:在白名单中即为匹配(允许通过) + is_match = best_name is not None and similarity >= self.similarity_threshold + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'best_match': best_name, + 'is_match': is_match, + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score) + } + results.append(result) + + # 在帧上绘制结果 + frame = self._draw_detection(frame, result) + + # 性能统计 + processing_time = (time.time() - start_time) * 1000 + self.processing_times.append(processing_time) + self.frame_count += 1 + + return frame, results + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + best_match = result['best_match'] + + # 选择颜色 - 根据名单模式 + if self.list_mode == "blacklist": + # 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色 + color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常 + else: # whitelist + # 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色 + color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 构建显示文本 + if best_match and similarity >= self.similarity_threshold: + name_text = f"{best_match}: {similarity:.3f}" + else: + name_text = f"Unknown: {similarity:.3f}" + + # 添加名单状态 + status = "MATCH" if is_match else "NO MATCH" + text = f"{status} | {name_text}" + + # 文本背景 + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] + cv2.rectangle(frame, (x1, y1 - text_size[1] - 10), + (x1 + text_size[0], y1), color, -1) + + # 文本 + cv2.putText(frame, text, (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) + + # 详细信息 + info_text = f"{result['gender']}/{result['age']}" + cv2.putText(frame, info_text, (x1, y2 + 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + return frame + + def process_video_file(self, video_path: str, output_path: str = None, + skip_frames: int = 0, show_preview: bool = True): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径 + skip_frames: 跳帧数,用于提高处理速度 + show_preview: 是否显示实时预览 + """ + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + return + + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") + print(f"🎯 当前模式: {self.list_mode}, 注册人脸数: {len(self.registered_faces)}") + + # 设置输出视频 + if output_path: + # 确保输出目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height)) + else: + out = None + + # 处理视频帧 + frame_index = 0 + processed_frames = 0 + start_time = time.time() + + print("🚀 开始处理视频...") + + while True: + ret, frame = cap.read() + if not ret: + break + + # 跳帧处理 + if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: + frame_index += 1 + continue + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 写入输出视频 + if out: + out.write(processed_frame) + + # 显示预览 + if show_preview: + # 添加性能信息 + fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)} | Mode: {self.list_mode}" + cv2.putText(processed_frame, fps_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 添加名单统计 + match_count = sum(1 for r in results if r['is_match']) + list_text = f"Match: {match_count}/{len(results)}" + cv2.putText(processed_frame, list_text, (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Video Face Recognition', processed_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_index += 1 + processed_frames += 1 + + # 进度显示 + if frame_index % 30 == 0: + progress = (frame_index / total_frames) * 100 + print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") + + # 清理资源 + cap.release() + if out: + out.release() + if show_preview: + cv2.destroyAllWindows() + + # 性能统计 + total_time = time.time() - start_time + avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 + + print(f"\n🎉 视频处理完成!") + print(f"📊 性能统计:") + print(f" 总处理帧数: {processed_frames}") + print(f" 总耗时: {total_time:.1f}秒") + print(f" 平均每帧: {avg_processing_time:.1f}ms") + print(f" 实际FPS: {processed_frames / total_time:.1f}") + if output_path: + print(f" 输出视频: {output_path}") + + def process_webcam(self, camera_id: int = 0, output_path: str = None): + """ + 处理摄像头实时视频流 + """ + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + print(f"❌ 无法打开摄像头 {camera_id}") + return + + # 设置摄像头分辨率(可选) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + # 设置输出视频 + if output_path: + # 确保输出目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + else: + out = None + + print(f"🎥 开始摄像头实时识别 - 模式: {self.list_mode} (按 'q' 退出)...") + print(f"📋 注册人脸数: {len(self.registered_faces)}") + + while True: + ret, frame = cap.read() + if not ret: + print("❌ 无法读取摄像头帧") + break + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 添加实时信息 + current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 + info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {self.list_mode}" + cv2.putText(processed_frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 添加名单统计 + match_count = sum(1 for r in results if r['is_match']) + list_text = f"Match: {match_count}/{len(results)}" + cv2.putText(processed_frame, list_text, (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 写入输出 + if out: + out.write(processed_frame) + + # 显示预览 + cv2.imshow('Real-time Face Recognition', processed_frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 清理资源 + cap.release() + if out: + out.release() + cv2.destroyAllWindows() + + print("✅ 摄像头处理结束") + + +# 使用示例 +def main(): + # 创建视频识别系统 + video_system = VideoFaceRecognition(use_gpu=True) + + # 设置名单模式 + # video_system.set_list_mode("blacklist") # 黑名单模式 + video_system.set_list_mode("whitelist") # 白名单模式 + + # 加载注册人脸 + register_dir = "test_data/register" # 注册图片目录 + if os.path.exists(register_dir): + video_system.load_registered_faces(register_dir) + else: + print(f"⚠️ 注册目录不存在: {register_dir}") + + # # 选择处理模式 + # print("请选择处理模式:") + # print("1. 处理视频文件") + # print("2. 实时摄像头") + # + # choice = input("请输入选择 (1 或 2): ").strip() + + choice = "1" + + if choice == "1": + # 处理视频文件 + video_path = "test_data/video/video_1.mp4" + output_path = "test_data/output_video/video_1_white.mp4" + + # 性能优化:跳帧处理 + skip_frames = 1 # 每2帧处理1帧,提高速度 + + video_system.process_video_file( + video_path=video_path, + output_path=output_path, + skip_frames=skip_frames, + show_preview=True + ) + + elif choice == "2": + # 实时摄像头 + output_path = "webcam_recording.mp4" # 可选:保存录制 + + video_system.process_webcam( + camera_id=0, + output_path=output_path + ) + + else: + print("❌ 无效选择") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/video_face_recognition_6.py b/src/video_face_recognition_6.py new file mode 100644 index 0000000..a1dc765 --- /dev/null +++ b/src/video_face_recognition_6.py @@ -0,0 +1,571 @@ +# video_face_recognition.py +import cv2 +import numpy as np +import time +from insightface.app import FaceAnalysis +from typing import List, Dict, Tuple +import os + +from sympy import false + + +#改进后的人脸质量显示 + +class VideoFaceRecognition: + """ + 视频人脸识别系统 + 支持实时视频流和视频文件处理 + """ + + def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): + + # 质量阈值设置 + self.det_size = 640 # 320快速 640中等 1280慢 + + self.det_threshold = 0.7 # 人脸置信度 + self.clarity_threshold = 1000.0 # 清晰度阈值,低于此值认为人脸模糊 + self.min_face_size = 30 # 最小人脸像素尺寸 + self.pitch_threshold = 30 # 人脸置信度 + self.yaw_threshold = 20 # 人脸置信度 + + self.quality_threshold = 0.6 # 质量得分阈值 + self.similarity_threshold = 0.1 + + # 初始化人脸识别模型 + self.app = FaceAnalysis(name=model_name) + self.app.prepare( + ctx_id=0 if use_gpu else -1, + det_thresh=self.det_threshold, + det_size=(640, 640) + ) + + self.target_embedding = None + self.target_id = None + + + # 性能统计 + self.frame_count = 0 + self.processing_times = [] + + print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") + + def set_target_face(self, image_path: str, person_id: str = "target") -> bool: + """设置目标人脸""" + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取目标图像: {image_path}") + return False + + faces = self.app.get(img) + if not faces: + print(f"❌ 目标图像中未检测到人脸: {image_path}") + return False + + self.target_embedding = faces[0].embedding + self.target_id = person_id + print(f"✅ 目标人脸设置: {person_id}") + return True + + def calculate_clarity(self, face_region: np.ndarray) -> float: + """ + 计算人脸区域的清晰度/模糊度 + 使用拉普拉斯方差方法:值越高表示图像越清晰 + """ + if len(face_region.shape) == 3: + gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY) + else: + gray = face_region + + # 计算拉普拉斯算子的方差 + laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() + return laplacian_var + + def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]: + """ + 综合判断人脸质量是否可接受 + 返回: (是否可接受, 质量指标字典) + """ + quality_metrics = {} + + is_acceptable = True + + # 1. 检测置信度 + quality_metrics['det_score'] = float(face.det_score) + + # 2. 人脸姿态角度 + if hasattr(face, 'pose') and face.pose is not None: + pitch, yaw, roll = face.pose + quality_metrics['pitch'] = float(pitch) + quality_metrics['yaw'] = float(yaw) + quality_metrics['roll'] = float(roll) + else: + quality_metrics['pitch'] = 100.0 + quality_metrics['yaw'] = 100.0 + quality_metrics['roll'] = 100.0 + + # 3. 人脸边界框信息 + bbox = face.bbox + x1, y1, x2, y2 = bbox.astype(int) + width = x2 - x1 + height = y2 - y1 + quality_metrics['bbox_width'] = width + quality_metrics['bbox_height'] = height + quality_metrics['bbox_area'] = width * height + quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 + + # 4. 图像清晰度检测 + # 提取人脸区域 + h, w = frame.shape[:2] + x1_clip = max(0, x1) + y1_clip = max(0, y1) + x2_clip = min(w, x2) + y2_clip = min(h, y2) + + if x2_clip > x1_clip and y2_clip > y1_clip: + face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip] + clarity_score = self.calculate_clarity(face_region) + quality_metrics['clarity_score'] = clarity_score + else: + quality_metrics['clarity_score'] = 0.0 + + # 5. 综合质量评分 + base_score = quality_metrics['det_score'] + + # 清晰度惩罚 + clarity_penalty = 0.0 + if quality_metrics['clarity_score'] < self.clarity_threshold: + clarity_penalty = 0.3 # 清晰度不足严重惩罚 + is_acceptable = False + + # 姿态惩罚 + pose_penalty = 0.0 + if abs(quality_metrics['yaw']) > self.yaw_threshold: + pose_penalty += 0.2 + is_acceptable = False + if abs(quality_metrics['pitch']) > self.pitch_threshold: + pose_penalty += 0.2 + is_acceptable = False + + # 尺寸惩罚 + size_penalty = 0.0 + # if quality_metrics['bbox_area'] < (self.min_face_size ** 2): + # size_penalty = 0.2 + if min(width, height) < self.min_face_size: + is_acceptable = False + size_penalty = 0.2 + + quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty) + + # # 判断是否可接受 + # is_acceptable = ( + # quality_metrics['det_score'] > 0.5 and # 基础检测置信度 + # quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求 + # quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求 + # abs(quality_metrics['yaw']) < 60 and # 偏航角限制 + # abs(quality_metrics['pitch']) < 45 # 俯仰角限制 + # ) + + return is_acceptable, quality_metrics + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: + """ + 处理单帧图像 + 返回: (处理后的帧, 识别结果列表) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + # 检查人脸质量是否可接受 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) + + similarity = 0.0 + # 只有在人脸质量可接受时才计算相似度 + if is_acceptable and self.target_embedding is not None: + emb1 = face.embedding / np.linalg.norm(face.embedding) + emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) + similarity = float(np.dot(emb1, emb2)) + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'is_match': similarity >= self.similarity_threshold, + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics, + 'is_acceptable': is_acceptable # 新增:是否可接受标志 + } + results.append(result) + + # 在帧上绘制结果 + frame = self._draw_detection(frame, result) + + # 性能统计 + processing_time = (time.time() - start_time) * 1000 + self.processing_times.append(processing_time) + self.frame_count += 1 + + return frame, results + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果和质量信息""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + is_acceptable = result['is_acceptable'] + quality_metrics = result['quality_metrics'] + + # 选择颜色 + if not is_acceptable: + color = (128, 128, 128) # 灰色 - 质量不可接受 + elif is_match: + color = (0, 255, 0) # 绿色 - 匹配 + else: + color = (0, 0, 255) # 红色 - 不匹配 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 准备显示文本 - 只保留关键信息 + text_lines = [] + + # 第一行:匹配状态 + if not is_acceptable: + text_lines.append("LOW QUALITY") + elif self.target_id: + status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" + text_lines.append(status) + else: + text_lines.append(f"Similarity: {similarity:.3f}") + + # 第二行:质量得分(根据阈值显示颜色) + text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") + + # 第三行:检测得分 + text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") + + # 第四行:清晰度 + text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}") + + # 第五行:姿态角度 + text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") + text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") + # text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°") + + text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}") + text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}") + + + # 计算文本区域大小 + max_text_width = 0 + total_text_height = 0 + line_heights = [] + + for line in text_lines: + (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) + max_text_width = max(max_text_width, text_width) + line_heights.append(text_height + baseline) + total_text_height += text_height + baseline + 2 + + # 绘制文本背景 + bg_x1 = x1 + bg_y1 = y1 - total_text_height - 10 + bg_x2 = x1 + max_text_width + 10 + bg_y2 = y1 + + # 如果背景超出图像顶部,调整到框下方 + if bg_y1 < 0: + bg_y1 = y2 + bg_y2 = y2 + total_text_height + 10 + + # 绘制半透明背景 + overlay = frame.copy() + cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) + alpha = 0.6 + cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) + + # 绘制文本 + current_y = bg_y1 + 15 + for i, line in enumerate(text_lines): + # 根据内容选择颜色 - 按照你的要求简化颜色规则 + if i == 0: # 状态行 + if not is_acceptable: + text_color = (128, 128, 128) # 灰色 - 质量差 + elif is_match: + text_color = (0, 255, 0) # 绿色 - 匹配 + else: + text_color = (0, 0, 255) # 红色 - 不匹配 + # elif i == 1: # 质量得分行 + # # 质量得分:低于阈值红色,大于等于阈值绿色 + # if quality_metrics['quality_score'] >= self.quality_threshold: + # text_color = (0, 255, 0) # 绿色 - 高质量 + # else: + # text_color = (0, 0, 255) # 红色 - 低质量 + elif i == 3: # 清晰度行 + # 清晰度:低于阈值红色,大于等于阈值绿色 + if quality_metrics['clarity_score'] >= self.clarity_threshold: + text_color = (255, 255, 255) + else: + text_color = (0, 0, 255) + elif i == 4: # pitch + if abs(quality_metrics['pitch']) > self.pitch_threshold: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + elif i == 5: # yaw + if abs(quality_metrics['yaw']) > self.yaw_threshold: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + elif i == 6: # + if quality_metrics['bbox_width'] < self.min_face_size: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + elif i == 7: # + if quality_metrics['bbox_height'] < self.min_face_size: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + else: + text_color = (255, 255, 255) # 白色 - 其他信息 + + cv2.putText(frame, line, (x1 + 5, current_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) + current_y += line_heights[i] + + return frame + + def set_quality_thresholds(self, clarity_threshold: float = None, + quality_threshold: float = None, + min_face_size: int = None): + """设置质量阈值""" + if clarity_threshold is not None: + self.clarity_threshold = clarity_threshold + if quality_threshold is not None: + self.quality_threshold = quality_threshold + if min_face_size is not None: + self.min_face_size = min_face_size + print( + f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}") + + def process_video_file(self, video_path: str, output_path: str = None, + skip_frames: int = 0, show_preview: bool = True): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径 + skip_frames: 跳帧数,用于提高处理速度 + show_preview: 是否显示实时预览 + """ + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + return + + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") + + # 设置输出视频 + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height)) + else: + out = None + + # 处理视频帧 + frame_index = 0 + processed_frames = 0 + start_time = time.time() + + print("🚀 开始处理视频...") + + while True: + ret, frame = cap.read() + if not ret: + break + + # 跳帧处理 + if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: + frame_index += 1 + continue + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 写入输出视频 + if out: + out.write(processed_frame) + + # 显示预览 + if show_preview: + # 添加性能信息 + fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}" + cv2.putText(processed_frame, fps_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Video Face Recognition', processed_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_index += 1 + processed_frames += 1 + + # 进度显示 + if frame_index % 30 == 0: + progress = (frame_index / total_frames) * 100 + print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") + + # 清理资源 + cap.release() + if out: + out.release() + if show_preview: + cv2.destroyAllWindows() + + # 性能统计 + total_time = time.time() - start_time + avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 + + print(f"\n🎉 视频处理完成!") + print(f"📊 性能统计:") + print(f" 总处理帧数: {processed_frames}") + print(f" 总耗时: {total_time:.1f}秒") + print(f" 平均每帧: {avg_processing_time:.1f}ms") + print(f" 实际FPS: {processed_frames / total_time:.1f}") + if output_path: + print(f" 输出视频: {output_path}") + + def process_webcam(self, camera_id: int = 0, output_path: str = None): + """ + 处理摄像头实时视频流 + """ + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + print(f"❌ 无法打开摄像头 {camera_id}") + return + + # 设置摄像头分辨率(可选) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + # 设置输出视频 + if output_path: + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + else: + out = None + + print("🎥 开始摄像头实时识别 (按 'q' 退出)...") + + while True: + ret, frame = cap.read() + if not ret: + print("❌ 无法读取摄像头帧") + break + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 添加实时信息 + current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 + info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}" + cv2.putText(processed_frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 写入输出 + if out: + out.write(processed_frame) + + # 显示预览 + cv2.imshow('Real-time Face Recognition', processed_frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 清理资源 + cap.release() + if out: + out.release() + cv2.destroyAllWindows() + + print("✅ 摄像头处理结束") + + +# 使用示例 +def main(): + # 创建视频识别系统 + video_system = VideoFaceRecognition(use_gpu=True) + + # 设置质量阈值(可根据实际情况调整) + video_system.set_quality_thresholds( + clarity_threshold=1000.0, # 清晰度阈值 + quality_threshold=0.6, # 质量得分阈值 + min_face_size=30 + ) + + # 设置目标人脸(可选) + target_image = "test_data/register/sy.jpg" + + if os.path.exists(target_image): + video_system.set_target_face(target_image, "目标人物") + + # # 选择处理模式 + # print("请选择处理模式:") + # print("1. 处理视频文件") + # print("2. 实时摄像头") + # + # choice = input("请输入选择 (1 或 2): ").strip() + choice = "1" + + if choice == "1": + # 处理视频文件 + video_path = "test_data/video/video_1.mp4" + output_path = "test_data/output_video/video_6_quality.mp4" + + # 性能优化:跳帧处理 + skip_frames = 1 + + video_system.process_video_file( + video_path=video_path, + output_path=output_path, + skip_frames=skip_frames, + show_preview=True + ) + + elif choice == "2": + # 实时摄像头 + output_path = "webcam_recording.mp4" + + video_system.process_webcam( + camera_id=0, + output_path=output_path + ) + + else: + print("❌ 无效选择") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/video_face_recognition_7.py b/src/video_face_recognition_7.py new file mode 100644 index 0000000..30ff4d6 --- /dev/null +++ b/src/video_face_recognition_7.py @@ -0,0 +1,684 @@ +# video_face_recognition.py +import cv2 +import numpy as np +import time +from insightface.app import FaceAnalysis +from typing import List, Dict, Tuple, Optional +import os +import glob + + +#改进后的人脸质量显示 + +class VideoFaceRecognition: + """ + 视频人脸识别系统 + 支持实时视频流和视频文件处理 + 支持黑名单和白名单模式 + """ + + # buffalo_l, buffalo_sc + + def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): + + # 质量阈值设置 + self.det_size = 640 # 320快速 640中等 1280慢 + + #white + self.list_mode = "whitelist" # "blacklist" 或 "whitelist" + self.det_threshold = 0.7 # 人脸置信度 + self.clarity_threshold = 1000.0 # 清晰度阈值,低于此值认为人脸模糊 + self.min_face_size = 30 # 最小人脸像素尺寸 + self.pitch_threshold = 40 # + self.yaw_threshold = 40 # + self.quality_threshold = 0.6 # 质量得分阈值 + self.similarity_threshold = 0.13 + + # #black + # self.list_mode = "blacklist" # "blacklist" 或 "whitelist" + # self.det_threshold = 0.5 # 人脸置信度 + # self.clarity_threshold = 100.0 # 清晰度阈值,低于此值认为人脸模糊 + # self.min_face_size = 20 # 最小人脸像素尺寸 + # self.pitch_threshold = 90 # + # self.yaw_threshold = 90 # + # self.quality_threshold = 0.6 # 质量得分阈值 + # self.similarity_threshold = 0.3 + + # 初始化人脸识别模型 + self.app = FaceAnalysis(name=model_name) + self.app.prepare( + ctx_id=0 if use_gpu else -1, + det_thresh=self.det_threshold, + det_size=(self.det_size,self. det_size) + ) + + # 名单相关变量 + self.registered_faces = {} # {name: embedding} + + # 性能统计 + self.frame_count = 0 + self.processing_times = [] + + print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") + + def set_list_mode(self, mode: str): + """设置名单模式""" + if mode.lower() in ["blacklist", "whitelist"]: + self.list_mode = mode.lower() + print(f"✅ 名单模式设置为: {self.list_mode}") + else: + print("❌ 无效的名单模式,请使用 'blacklist' 或 'whitelist'") + + def load_registered_faces(self, register_dir: str): + """ + 从目录加载注册的人脸图片 + 文件名(去掉后缀)即为人的名字 + """ + if not os.path.exists(register_dir): + print(f"❌ 注册目录不存在: {register_dir}") + return False + + # 支持的图片格式 + image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp'] + image_files = [] + + for ext in image_extensions: + image_files.extend(glob.glob(os.path.join(register_dir, ext))) + image_files.extend(glob.glob(os.path.join(register_dir, ext.upper()))) + + if not image_files: + print(f"❌ 在目录 {register_dir} 中未找到图片文件") + return False + + loaded_count = 0 + for image_path in image_files: + # 获取文件名(不含扩展名)作为人名 + person_name = os.path.splitext(os.path.basename(image_path))[0] + + # 读取图片并提取人脸特征 + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取图片: {image_path}") + continue + + faces = self.app.get(img) + if not faces: + print(f"❌ 图片中未检测到人脸: {image_path}") + continue + + # 使用第一张检测到的人脸 + self.registered_faces[person_name] = faces[0].embedding + loaded_count += 1 + print(f"✅ 加载注册人脸: {person_name}") + + print(f"🎉 成功加载 {loaded_count} 张注册人脸") + return loaded_count > 0 + + def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]: + """ + 在注册人脸中查找最佳匹配 + 返回: (匹配的人名, 相似度) + """ + if not self.registered_faces: + return None, 0.0 + + best_similarity = 0.0 + best_name = None + + # 归一化查询嵌入 + query_emb = embedding / np.linalg.norm(embedding) + + for name, registered_embedding in self.registered_faces.items(): + # 归一化注册嵌入 + reg_emb = registered_embedding / np.linalg.norm(registered_embedding) + + # 计算余弦相似度 + similarity = float(np.dot(query_emb, reg_emb)) + + if similarity > best_similarity: + best_similarity = similarity + best_name = name + + return best_name, best_similarity + + + def calculate_clarity(self, face_region: np.ndarray) -> float: + """ + 计算人脸区域的清晰度/模糊度 + 使用拉普拉斯方差方法:值越高表示图像越清晰 + """ + if len(face_region.shape) == 3: + gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY) + else: + gray = face_region + + # 计算拉普拉斯算子的方差 + laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() + return laplacian_var + + def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]: + """ + 综合判断人脸质量是否可接受 + 返回: (是否可接受, 质量指标字典) + """ + quality_metrics = {} + + is_acceptable = True + + # 1. 检测置信度 + quality_metrics['det_score'] = float(face.det_score) + + # 2. 人脸姿态角度 + if hasattr(face, 'pose') and face.pose is not None: + pitch, yaw, roll = face.pose + quality_metrics['pitch'] = float(pitch) + quality_metrics['yaw'] = float(yaw) + quality_metrics['roll'] = float(roll) + else: + quality_metrics['pitch'] = 100.0 + quality_metrics['yaw'] = 100.0 + quality_metrics['roll'] = 100.0 + + # 3. 人脸边界框信息 + bbox = face.bbox + x1, y1, x2, y2 = bbox.astype(int) + width = x2 - x1 + height = y2 - y1 + quality_metrics['bbox_width'] = width + quality_metrics['bbox_height'] = height + quality_metrics['bbox_area'] = width * height + quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 + + # 4. 图像清晰度检测 + # 提取人脸区域 + h, w = frame.shape[:2] + x1_clip = max(0, x1) + y1_clip = max(0, y1) + x2_clip = min(w, x2) + y2_clip = min(h, y2) + + if x2_clip > x1_clip and y2_clip > y1_clip: + face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip] + clarity_score = self.calculate_clarity(face_region) + quality_metrics['clarity_score'] = clarity_score + else: + quality_metrics['clarity_score'] = 0.0 + + # 5. 综合质量评分 + base_score = quality_metrics['det_score'] + + # 清晰度惩罚 + clarity_penalty = 0.0 + if quality_metrics['clarity_score'] < self.clarity_threshold: + clarity_penalty = 0.3 # 清晰度不足严重惩罚 + is_acceptable = False + + # 姿态惩罚 + pose_penalty = 0.0 + if abs(quality_metrics['yaw']) > self.yaw_threshold: + pose_penalty += 0.2 + is_acceptable = False + if abs(quality_metrics['pitch']) > self.pitch_threshold: + pose_penalty += 0.2 + is_acceptable = False + + # 尺寸惩罚 + size_penalty = 0.0 + # if quality_metrics['bbox_area'] < (self.min_face_size ** 2): + # size_penalty = 0.2 + if min(width, height) < self.min_face_size: + is_acceptable = False + size_penalty = 0.2 + + quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty) + + # # 判断是否可接受 + # is_acceptable = ( + # quality_metrics['det_score'] > 0.5 and # 基础检测置信度 + # quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求 + # quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求 + # abs(quality_metrics['yaw']) < 60 and # 偏航角限制 + # abs(quality_metrics['pitch']) < 45 # 俯仰角限制 + # ) + + return is_acceptable, quality_metrics + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: + """ + 处理单帧图像 + 返回: (处理后的帧, 识别结果列表) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + # 检查人脸质量是否可接受 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) + + # 查找最佳匹配 + best_name, similarity = self.find_best_match(face.embedding) + + # 根据名单模式判断是否匹配 + if self.list_mode == "blacklist": + # 黑名单模式:在黑名单中即为匹配(需要关注) + is_match = best_name is not None and similarity >= self.similarity_threshold + else: # whitelist + # 白名单模式:在白名单中即为匹配(允许通过) + is_match = best_name is not None and similarity >= self.similarity_threshold + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'best_match': best_name, + 'is_match': is_match, + # 'gender': 'Male' if face.gender == 1 else 'Female', + # 'age': int(face.age), + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics, + 'is_acceptable': is_acceptable # 新增:是否可接受标志 + } + results.append(result) + + # 在帧上绘制结果 + frame = self._draw_detection(frame, result) + + # 性能统计 + processing_time = (time.time() - start_time) * 1000 + self.processing_times.append(processing_time) + self.frame_count += 1 + + return frame, results + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果和质量信息""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + is_acceptable = result['is_acceptable'] + quality_metrics = result['quality_metrics'] + best_match = result['best_match'] + + + + # 选择颜色 + if not is_acceptable: + color = (128, 128, 128) # 灰色 - 质量不可接受 + else: + # 选择颜色 - 根据名单模式 + if self.list_mode == "blacklist": + # 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色 + color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常 + else: # whitelist + # 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色 + color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 准备显示文本 - 只保留关键信息 + text_lines = [] + + # 第一行:匹配状态 + if not is_acceptable: + text_lines.append("LOW QUALITY") + else: + status = f"MATCH: {best_match}: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" + text_lines.append(status) + + # 第二行:质量得分(根据阈值显示颜色) + text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") + + # 第三行:检测得分 + text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") + + # 第四行:清晰度 + text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}") + + # 第五行:姿态角度 + text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") + text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") + # text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°") + + text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}") + text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}") + + + # 计算文本区域大小 + max_text_width = 0 + total_text_height = 0 + line_heights = [] + + for line in text_lines: + (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) + max_text_width = max(max_text_width, text_width) + line_heights.append(text_height + baseline) + total_text_height += text_height + baseline + 2 + + # 绘制文本背景 + bg_x1 = x1 + bg_y1 = y1 - total_text_height - 10 + bg_x2 = x1 + max_text_width + 10 + bg_y2 = y1 + + # 如果背景超出图像顶部,调整到框下方 + if bg_y1 < 0: + bg_y1 = y2 + bg_y2 = y2 + total_text_height + 10 + + # 绘制半透明背景 + overlay = frame.copy() + cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) + alpha = 0.6 + cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) + + # 绘制文本 + current_y = bg_y1 + 15 + for i, line in enumerate(text_lines): + # 根据内容选择颜色 - 按照你的要求简化颜色规则 + if i == 0: # 状态行 + if not is_acceptable: + text_color = (128, 128, 128) # 灰色 - 质量差 + elif is_match: + text_color = (0, 255, 0) # 绿色 - 匹配 + else: + text_color = (0, 0, 255) # 红色 - 不匹配 + # elif i == 1: # 质量得分行 + # # 质量得分:低于阈值红色,大于等于阈值绿色 + # if quality_metrics['quality_score'] >= self.quality_threshold: + # text_color = (0, 255, 0) # 绿色 - 高质量 + # else: + # text_color = (0, 0, 255) # 红色 - 低质量 + elif i == 3: # 清晰度行 + # 清晰度:低于阈值红色,大于等于阈值绿色 + if quality_metrics['clarity_score'] >= self.clarity_threshold: + text_color = (255, 255, 255) + else: + text_color = (0, 0, 255) + elif i == 4: # pitch + if abs(quality_metrics['pitch']) > self.pitch_threshold: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + elif i == 5: # yaw + if abs(quality_metrics['yaw']) > self.yaw_threshold: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + elif i == 6: # + if quality_metrics['bbox_width'] < self.min_face_size: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + elif i == 7: # + if quality_metrics['bbox_height'] < self.min_face_size: + text_color = (0, 0, 255) # 红色 + else: + text_color = (255, 255, 255) + else: + text_color = (255, 255, 255) # 白色 - 其他信息 + + cv2.putText(frame, line, (x1 + 5, current_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) + current_y += line_heights[i] + + return frame + + def set_quality_thresholds(self, clarity_threshold: float = None, + quality_threshold: float = None, + min_face_size: int = None): + """设置质量阈值""" + if clarity_threshold is not None: + self.clarity_threshold = clarity_threshold + if quality_threshold is not None: + self.quality_threshold = quality_threshold + if min_face_size is not None: + self.min_face_size = min_face_size + print( + f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}") + + def process_video_file(self, video_path: str, output_path: str = None, + skip_frames: int = 0, show_preview: bool = True): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径 + skip_frames: 跳帧数,用于提高处理速度 + show_preview: 是否显示实时预览 + """ + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + return + + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") + print(f"🎯 当前模式: {self.list_mode}, 注册人脸数: {len(self.registered_faces)}") + + # 设置输出视频 + if output_path: + # 确保输出目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames), (width, height)) + else: + out = None + + # 处理视频帧 + frame_index = 0 + processed_frames = 0 + start_time = time.time() + + print("🚀 开始处理视频...") + + while True: + ret, frame = cap.read() + if not ret: + break + + # 跳帧处理 + if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: + frame_index += 1 + continue + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 写入输出视频 + if out: + out.write(processed_frame) + + # 显示预览 + if show_preview: + # 添加性能信息 + fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)} | Mode: {self.list_mode}" + cv2.putText(processed_frame, fps_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 添加名单统计 + match_count = sum(1 for r in results if r['is_match']) + list_text = f"Match: {match_count}/{len(results)}" + cv2.putText(processed_frame, list_text, (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Video Face Recognition', processed_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_index += 1 + processed_frames += 1 + + # 进度显示 + if frame_index % 30 == 0: + progress = (frame_index / total_frames) * 100 + print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") + + # 清理资源 + cap.release() + if out: + out.release() + if show_preview: + cv2.destroyAllWindows() + + # 性能统计 + total_time = time.time() - start_time + avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 + + print(f"\n🎉 视频处理完成!") + print(f"📊 性能统计:") + print(f" 总处理帧数: {processed_frames}") + print(f" 总耗时: {total_time:.1f}秒") + print(f" 平均每帧: {avg_processing_time:.1f}ms") + print(f" 实际FPS: {processed_frames / total_time:.1f}") + if output_path: + print(f" 输出视频: {output_path}") + + def process_webcam(self, camera_id: int = 0, output_path: str = None): + """ + 处理摄像头实时视频流 + """ + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + print(f"❌ 无法打开摄像头 {camera_id}") + return + + # 设置摄像头分辨率(可选) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + # 设置输出视频 + if output_path: + # 确保输出目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + else: + out = None + + print(f"🎥 开始摄像头实时识别 - 模式: {self.list_mode} (按 'q' 退出)...") + print(f"📋 注册人脸数: {len(self.registered_faces)}") + + while True: + ret, frame = cap.read() + if not ret: + print("❌ 无法读取摄像头帧") + break + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 添加实时信息 + current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 + info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {self.list_mode}" + cv2.putText(processed_frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 添加名单统计 + match_count = sum(1 for r in results if r['is_match']) + list_text = f"Match: {match_count}/{len(results)}" + cv2.putText(processed_frame, list_text, (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 写入输出 + if out: + out.write(processed_frame) + + # 显示预览 + cv2.imshow('Real-time Face Recognition', processed_frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 清理资源 + cap.release() + if out: + out.release() + cv2.destroyAllWindows() + + print("✅ 摄像头处理结束") + + +# 使用示例 +def main(): + # 创建视频识别系统 + video_system = VideoFaceRecognition(use_gpu=True) + + # 设置名单模式 + # video_system.set_list_mode("blacklist") # 黑名单模式 + # video_system.set_list_mode("whitelist") # 白名单模式 + + # 加载注册人脸 + register_dir = "test_data/register" # 注册图片目录 + if os.path.exists(register_dir): + video_system.load_registered_faces(register_dir) + else: + print(f"⚠️ 注册目录不存在: {register_dir}") + # + # # 设置质量阈值(可根据实际情况调整) + # video_system.set_quality_thresholds( + # clarity_threshold=1000.0, # 清晰度阈值 + # quality_threshold=0.6, # 质量得分阈值 + # min_face_size=30 + # ) + + + + # # 选择处理模式 + # print("请选择处理模式:") + # print("1. 处理视频文件") + # print("2. 实时摄像头") + # + # choice = input("请输入选择 (1 或 2): ").strip() + + choice = "1" + + if choice == "1": + # 处理视频文件 + video_path = "test_data/video/video_2.mp4" + output_path = "test_data/output_video/video_2_white_7_gpu.mp4" + # output_path = "test_data/output_video/video_2_black_2.mp4" + + # 性能优化:跳帧处理 + skip_frames = 2 + + video_system.process_video_file( + video_path=video_path, + output_path=output_path, + skip_frames=skip_frames, + show_preview=False + ) + + elif choice == "2": + # 实时摄像头 + output_path = "webcam_recording.mp4" + + video_system.process_webcam( + camera_id=0, + output_path=output_path + ) + + else: + print("❌ 无效选择") + + +if __name__ == "__main__": + main()