# video_face_recognition.py import cv2 import numpy as np import time from insightface.app import FaceAnalysis from typing import List, Dict, Tuple import os from sympy import false #改进后的人脸质量显示 class VideoFaceRecognition: """ 视频人脸识别系统 支持实时视频流和视频文件处理 """ def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): # 质量阈值设置 self.det_size = 640 # 320快速 640中等 1280慢 self.det_threshold = 0.7 # 人脸置信度 self.clarity_threshold = 1000.0 # 清晰度阈值,低于此值认为人脸模糊 self.min_face_size = 30 # 最小人脸像素尺寸 self.pitch_threshold = 30 # 人脸置信度 self.yaw_threshold = 20 # 人脸置信度 self.quality_threshold = 0.6 # 质量得分阈值 self.similarity_threshold = 0.1 # 初始化人脸识别模型 self.app = FaceAnalysis(name=model_name) self.app.prepare( ctx_id=0 if use_gpu else -1, det_thresh=self.det_threshold, det_size=(640, 640) ) self.target_embedding = None self.target_id = None # 性能统计 self.frame_count = 0 self.processing_times = [] print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") def set_target_face(self, image_path: str, person_id: str = "target") -> bool: """设置目标人脸""" img = cv2.imread(image_path) if img is None: print(f"❌ 无法读取目标图像: {image_path}") return False faces = self.app.get(img) if not faces: print(f"❌ 目标图像中未检测到人脸: {image_path}") return False self.target_embedding = faces[0].embedding self.target_id = person_id print(f"✅ 目标人脸设置: {person_id}") return True def calculate_clarity(self, face_region: np.ndarray) -> float: """ 计算人脸区域的清晰度/模糊度 使用拉普拉斯方差方法:值越高表示图像越清晰 """ if len(face_region.shape) == 3: gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY) else: gray = face_region # 计算拉普拉斯算子的方差 laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() return laplacian_var def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]: """ 综合判断人脸质量是否可接受 返回: (是否可接受, 质量指标字典) """ quality_metrics = {} is_acceptable = True # 1. 检测置信度 quality_metrics['det_score'] = float(face.det_score) # 2. 人脸姿态角度 if hasattr(face, 'pose') and face.pose is not None: pitch, yaw, roll = face.pose quality_metrics['pitch'] = float(pitch) quality_metrics['yaw'] = float(yaw) quality_metrics['roll'] = float(roll) else: quality_metrics['pitch'] = 100.0 quality_metrics['yaw'] = 100.0 quality_metrics['roll'] = 100.0 # 3. 人脸边界框信息 bbox = face.bbox x1, y1, x2, y2 = bbox.astype(int) width = x2 - x1 height = y2 - y1 quality_metrics['bbox_width'] = width quality_metrics['bbox_height'] = height quality_metrics['bbox_area'] = width * height quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 # 4. 图像清晰度检测 # 提取人脸区域 h, w = frame.shape[:2] x1_clip = max(0, x1) y1_clip = max(0, y1) x2_clip = min(w, x2) y2_clip = min(h, y2) if x2_clip > x1_clip and y2_clip > y1_clip: face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip] clarity_score = self.calculate_clarity(face_region) quality_metrics['clarity_score'] = clarity_score else: quality_metrics['clarity_score'] = 0.0 # 5. 综合质量评分 base_score = quality_metrics['det_score'] # 清晰度惩罚 clarity_penalty = 0.0 if quality_metrics['clarity_score'] < self.clarity_threshold: clarity_penalty = 0.3 # 清晰度不足严重惩罚 is_acceptable = False # 姿态惩罚 pose_penalty = 0.0 if abs(quality_metrics['yaw']) > self.yaw_threshold: pose_penalty += 0.2 is_acceptable = False if abs(quality_metrics['pitch']) > self.pitch_threshold: pose_penalty += 0.2 is_acceptable = False # 尺寸惩罚 size_penalty = 0.0 # if quality_metrics['bbox_area'] < (self.min_face_size ** 2): # size_penalty = 0.2 if min(width, height) < self.min_face_size: is_acceptable = False size_penalty = 0.2 quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty) # # 判断是否可接受 # is_acceptable = ( # quality_metrics['det_score'] > 0.5 and # 基础检测置信度 # quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求 # quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求 # abs(quality_metrics['yaw']) < 60 and # 偏航角限制 # abs(quality_metrics['pitch']) < 45 # 俯仰角限制 # ) return is_acceptable, quality_metrics def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: """ 处理单帧图像 返回: (处理后的帧, 识别结果列表) """ start_time = time.time() # 人脸检测和识别 faces = self.app.get(frame) results = [] for face in faces: # 检查人脸质量是否可接受 is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) similarity = 0.0 # 只有在人脸质量可接受时才计算相似度 if is_acceptable and self.target_embedding is not None: emb1 = face.embedding / np.linalg.norm(face.embedding) emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) similarity = float(np.dot(emb1, emb2)) result = { 'bbox': face.bbox.astype(int).tolist(), 'similarity': similarity, 'is_match': similarity >= self.similarity_threshold, 'gender': 'Male' if face.gender == 1 else 'Female', 'age': int(face.age), 'det_score': float(face.det_score), 'quality_metrics': quality_metrics, 'is_acceptable': is_acceptable # 新增:是否可接受标志 } results.append(result) # 在帧上绘制结果 frame = self._draw_detection(frame, result) # 性能统计 processing_time = (time.time() - start_time) * 1000 self.processing_times.append(processing_time) self.frame_count += 1 return frame, results def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: """在帧上绘制检测结果和质量信息""" bbox = result['bbox'] similarity = result['similarity'] is_match = result['is_match'] is_acceptable = result['is_acceptable'] quality_metrics = result['quality_metrics'] # 选择颜色 if not is_acceptable: color = (128, 128, 128) # 灰色 - 质量不可接受 elif is_match: color = (0, 255, 0) # 绿色 - 匹配 else: color = (0, 0, 255) # 红色 - 不匹配 # 绘制人脸框 x1, y1, x2, y2 = bbox cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # 准备显示文本 - 只保留关键信息 text_lines = [] # 第一行:匹配状态 if not is_acceptable: text_lines.append("LOW QUALITY") elif self.target_id: status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" text_lines.append(status) else: text_lines.append(f"Similarity: {similarity:.3f}") # 第二行:质量得分(根据阈值显示颜色) text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") # 第三行:检测得分 text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") # 第四行:清晰度 text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}") # 第五行:姿态角度 text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") # text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°") text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}") text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}") # 计算文本区域大小 max_text_width = 0 total_text_height = 0 line_heights = [] for line in text_lines: (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) max_text_width = max(max_text_width, text_width) line_heights.append(text_height + baseline) total_text_height += text_height + baseline + 2 # 绘制文本背景 bg_x1 = x1 bg_y1 = y1 - total_text_height - 10 bg_x2 = x1 + max_text_width + 10 bg_y2 = y1 # 如果背景超出图像顶部,调整到框下方 if bg_y1 < 0: bg_y1 = y2 bg_y2 = y2 + total_text_height + 10 # 绘制半透明背景 overlay = frame.copy() cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) alpha = 0.6 cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) # 绘制文本 current_y = bg_y1 + 15 for i, line in enumerate(text_lines): # 根据内容选择颜色 - 按照你的要求简化颜色规则 if i == 0: # 状态行 if not is_acceptable: text_color = (128, 128, 128) # 灰色 - 质量差 elif is_match: text_color = (0, 255, 0) # 绿色 - 匹配 else: text_color = (0, 0, 255) # 红色 - 不匹配 # elif i == 1: # 质量得分行 # # 质量得分:低于阈值红色,大于等于阈值绿色 # if quality_metrics['quality_score'] >= self.quality_threshold: # text_color = (0, 255, 0) # 绿色 - 高质量 # else: # text_color = (0, 0, 255) # 红色 - 低质量 elif i == 3: # 清晰度行 # 清晰度:低于阈值红色,大于等于阈值绿色 if quality_metrics['clarity_score'] >= self.clarity_threshold: text_color = (255, 255, 255) else: text_color = (0, 0, 255) elif i == 4: # pitch if abs(quality_metrics['pitch']) > self.pitch_threshold: text_color = (0, 0, 255) # 红色 else: text_color = (255, 255, 255) elif i == 5: # yaw if abs(quality_metrics['yaw']) > self.yaw_threshold: text_color = (0, 0, 255) # 红色 else: text_color = (255, 255, 255) elif i == 6: # if quality_metrics['bbox_width'] < self.min_face_size: text_color = (0, 0, 255) # 红色 else: text_color = (255, 255, 255) elif i == 7: # if quality_metrics['bbox_height'] < self.min_face_size: text_color = (0, 0, 255) # 红色 else: text_color = (255, 255, 255) else: text_color = (255, 255, 255) # 白色 - 其他信息 cv2.putText(frame, line, (x1 + 5, current_y), cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) current_y += line_heights[i] return frame def set_quality_thresholds(self, clarity_threshold: float = None, quality_threshold: float = None, min_face_size: int = None): """设置质量阈值""" if clarity_threshold is not None: self.clarity_threshold = clarity_threshold if quality_threshold is not None: self.quality_threshold = quality_threshold if min_face_size is not None: self.min_face_size = min_face_size print( f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}") def process_video_file(self, video_path: str, output_path: str = None, skip_frames: int = 0, show_preview: bool = True): """ 处理视频文件 Args: video_path: 输入视频路径 output_path: 输出视频路径 skip_frames: 跳帧数,用于提高处理速度 show_preview: 是否显示实时预览 """ if not os.path.exists(video_path): print(f"❌ 视频文件不存在: {video_path}") return # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 无法打开视频文件: {video_path}") return # 获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") # 设置输出视频 if output_path: fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height)) else: out = None # 处理视频帧 frame_index = 0 processed_frames = 0 start_time = time.time() print("🚀 开始处理视频...") while True: ret, frame = cap.read() if not ret: break # 跳帧处理 if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: frame_index += 1 continue # 处理当前帧 processed_frame, results = self.process_frame(frame) # 写入输出视频 if out: out.write(processed_frame) # 显示预览 if show_preview: # 添加性能信息 fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}" cv2.putText(processed_frame, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) cv2.imshow('Video Face Recognition', processed_frame) if cv2.waitKey(1) & 0xFF == ord('q'): break frame_index += 1 processed_frames += 1 # 进度显示 if frame_index % 30 == 0: progress = (frame_index / total_frames) * 100 print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") # 清理资源 cap.release() if out: out.release() if show_preview: cv2.destroyAllWindows() # 性能统计 total_time = time.time() - start_time avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 print(f"\n🎉 视频处理完成!") print(f"📊 性能统计:") print(f" 总处理帧数: {processed_frames}") print(f" 总耗时: {total_time:.1f}秒") print(f" 平均每帧: {avg_processing_time:.1f}ms") print(f" 实际FPS: {processed_frames / total_time:.1f}") if output_path: print(f" 输出视频: {output_path}") def process_webcam(self, camera_id: int = 0, output_path: str = None): """ 处理摄像头实时视频流 """ cap = cv2.VideoCapture(camera_id) if not cap.isOpened(): print(f"❌ 无法打开摄像头 {camera_id}") return # 设置摄像头分辨率(可选) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # 设置输出视频 if output_path: fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) else: out = None print("🎥 开始摄像头实时识别 (按 'q' 退出)...") while True: ret, frame = cap.read() if not ret: print("❌ 无法读取摄像头帧") break # 处理当前帧 processed_frame, results = self.process_frame(frame) # 添加实时信息 current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}" cv2.putText(processed_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # 写入输出 if out: out.write(processed_frame) # 显示预览 cv2.imshow('Real-time Face Recognition', processed_frame) # 按'q'退出 if cv2.waitKey(1) & 0xFF == ord('q'): break # 清理资源 cap.release() if out: out.release() cv2.destroyAllWindows() print("✅ 摄像头处理结束") # 使用示例 def main(): # 创建视频识别系统 video_system = VideoFaceRecognition(use_gpu=True) # 设置质量阈值(可根据实际情况调整) video_system.set_quality_thresholds( clarity_threshold=1000.0, # 清晰度阈值 quality_threshold=0.6, # 质量得分阈值 min_face_size=30 ) # 设置目标人脸(可选) target_image = "test_data/register/sy.jpg" if os.path.exists(target_image): video_system.set_target_face(target_image, "目标人物") # # 选择处理模式 # print("请选择处理模式:") # print("1. 处理视频文件") # print("2. 实时摄像头") # # choice = input("请输入选择 (1 或 2): ").strip() choice = "1" if choice == "1": # 处理视频文件 video_path = "test_data/video/video_1.mp4" output_path = "test_data/output_video/video_6_quality.mp4" # 性能优化:跳帧处理 skip_frames = 1 video_system.process_video_file( video_path=video_path, output_path=output_path, skip_frames=skip_frames, show_preview=True ) elif choice == "2": # 实时摄像头 output_path = "webcam_recording.mp4" video_system.process_webcam( camera_id=0, output_path=output_path ) else: print("❌ 无效选择") if __name__ == "__main__": main()