""" 视频检查业务类 继承自BaseFaceBiz,专门处理视频中的人脸特征提取 """ import cv2 import numpy as np from typing import Optional, List, Dict import os from insightface.app import FaceAnalysis from biz.base_face_biz import BaseFaceBiz class VideoCheckBiz(BaseFaceBiz): """ 视频检查业务类 专门处理视频中的人脸特征提取和质量评估 """ def __init__(self, face_analysis: FaceAnalysis): """ 初始化视频检查业务类 参数: face_analysis: 已初始化好的FaceAnalysis实例 """ super().__init__(face_analysis) def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray: """ 重写绘制检测结果方法 只在检测到黑名单匹配时用红色绘制人脸框 参数: frame: 原始帧图像 results: 检测结果列表 返回: 绘制后的帧图像 """ for result in results: # 只在黑名单匹配时绘制 if result['is_match']: bbox = result['bbox'] # 使用红色绘制人脸框 x1, y1, x2, y2 = bbox cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # 添加简单的匹配信息 best_match = result['best_match'] similarity = result['similarity'] # 绘制匹配信息 text = f"{best_match}: {similarity:.3f}" text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0] # 绘制文本背景 cv2.rectangle(frame, (x1, y1 - text_size[1] - 5), (x1 + text_size[0], y1), (0, 0, 0), -1) # 绘制文本 cv2.putText(frame, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) return frame def extract_best_face_from_video(self, video_path: str, frame_skip: int = 10) -> Optional[np.ndarray]: """ 从视频中提取最佳人脸特征 参数: video_path: 视频文件路径 frame_skip: 跳帧数,每隔多少帧取一帧 返回: 最佳人脸的特征向量,如果未找到合适的人脸则返回None """ if not os.path.exists(video_path): print(f"❌ 视频文件不存在: {video_path}") return None # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 无法打开视频文件: {video_path}") return None total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) print(f"📹 视频信息: {total_frames}帧, {fps:.1f}FPS") print(f"🔍 跳帧设置: 每{frame_skip}帧取一帧") best_quality_score = -1.0 best_feature = None best_frame_info = None processed_frames = 0 valid_frames = 0 frame_index = 0 while True: ret, frame = cap.read() if not ret: break # 跳过指定帧数 if frame_index % frame_skip != 0: frame_index += 1 continue processed_frames += 1 # 人脸检测 faces = self.app.get(frame) # 检查人脸数量 if len(faces) == 0: # 无人脸,跳过 frame_index += 1 continue elif len(faces) > 1: # 超过1个人脸,舍弃该帧 frame_index += 1 continue # 只有1个人脸 face = faces[0] # 检查人脸质量 is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) if not is_acceptable: # 质量不可接受,跳过 frame_index += 1 continue valid_frames += 1 # 计算综合质量得分 quality_score = self._calculate_face_quality_score(face, quality_metrics) # 更新最佳人脸 if quality_score > best_quality_score: best_quality_score = quality_score best_feature = face.embedding best_frame_info = { 'frame_index': frame_index, 'quality_score': quality_score, 'quality_metrics': quality_metrics } frame_index += 1 cap.release() # 输出结果统计 print(f"📊 处理统计: 共处理{processed_frames}帧, 有效帧{valid_frames}帧") if best_feature is not None: print(f"✅ 找到最佳人脸: 帧{best_frame_info['frame_index']}, 质量得分: {best_quality_score:.3f}") print(f" 检测得分: {best_frame_info['quality_metrics']['det_score']:.3f}") print(f" 清晰度: {best_frame_info['quality_metrics']['clarity_score']:.1f}") print(f" 姿态角度: pitch={best_frame_info['quality_metrics']['pitch']:.1f}°, yaw={best_frame_info['quality_metrics']['yaw']:.1f}°") print(f" 人脸尺寸: {best_frame_info['quality_metrics']['bbox_width']}x{best_frame_info['quality_metrics']['bbox_height']}") else: print("❌ 未找到合适的人脸") return best_feature def _calculate_face_quality_score(self, face, quality_metrics: Dict) -> float: """ 计算人脸的综合质量得分 参数: face: 人脸检测结果 quality_metrics: 质量指标字典 返回: 综合质量得分 (0-1) """ # 基础得分:检测置信度 base_score = quality_metrics['det_score'] # 清晰度得分 (归一化到0-1) clarity_score = min(quality_metrics['clarity_score'] / self.clarity_threshold, 1.0) # 姿态得分 (基于角度偏差) pitch_score = 1.0 - min(abs(quality_metrics['pitch']) / self.pitch_threshold, 1.0) yaw_score = 1.0 - min(abs(quality_metrics['yaw']) / self.yaw_threshold, 1.0) # 尺寸得分 (基于最小尺寸要求) min_dim = min(quality_metrics['bbox_width'], quality_metrics['bbox_height']) size_score = min(min_dim / self.min_face_size, 1.0) # 综合得分权重 weights = { 'base': 0.3, # 检测置信度 'clarity': 0.3, # 清晰度 'pose': 0.2, # 姿态 'size': 0.2 # 尺寸 } # 计算综合得分 pose_score = (pitch_score + yaw_score) / 2 total_score = ( base_score * weights['base'] + clarity_score * weights['clarity'] + pose_score * weights['pose'] + size_score * weights['size'] ) return total_score def extract_best_face_from_video_with_details(self, video_path: str, frame_skip: int = 10) -> Optional[Dict]: """ 从视频中提取最佳人脸特征(带详细信息) 参数: video_path: 视频文件路径 frame_skip: 跳帧数 返回: 包含特征向量和详细信息的字典,如果未找到合适的人脸则返回None """ best_feature = self.extract_best_face_from_video(video_path, frame_skip) if best_feature is not None: # 重新处理视频获取详细信息 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return { 'feature': best_feature, 'frame_info': None, 'video_info': { 'path': video_path, 'frame_skip': frame_skip } } # 这里可以添加更多详细信息的提取逻辑 cap.release() return { 'feature': best_feature, 'frame_info': { 'frame_skip': frame_skip }, 'video_info': { 'path': video_path } } return None def batch_process_videos_with_blacklist_detection(self, video_infos: List[Dict], frame_skip: int = 10, suffix: str = "_processed") -> List[Dict]: """ 批量处理视频文件,进行黑名单检测并保存结果 参数: video_infos: 视频信息列表,每个元素包含: { 'video_id': 视频ID (可选), 'video_name': 视频名称 (可选), 'video_path': 视频文件路径 (必需) } frame_skip: 跳帧数,每隔多少帧处理一帧 suffix: 输出文件后缀 返回: 处理结果列表,每个元素包含: { 'video_id': 视频ID, 'video_name': 视频名称, 'original_path': 原视频路径, 'processed_path': 处理后的视频路径, 'has_blacklist_match': 是否检测到黑名单中人脸, 'detection_count': 黑名单匹配次数, 'total_frames_processed': 处理的总帧数, 'blacklist_matches': 黑名单匹配详情列表 } """ results = [] # 确保使用黑名单模式 self.set_list_mode("0") print(f"🎯 开始批量处理视频: 共{len(video_infos)}个视频") print(f"🔍 检测模式: 黑名单模式, 跳帧数: {frame_skip}") for i, video_info in enumerate(video_infos, 1): video_id = video_info.get('video_id') video_name = video_info.get('video_name') video_path = video_info.get('video_path') if not video_path: print(f"❌ 视频信息缺少路径: {video_info}") results.append({ 'video_id': video_id, 'video_name': video_name, 'original_path': None, 'processed_path': None, 'has_blacklist_match': False, 'detection_count': 0, 'total_frames_processed': 0, 'blacklist_matches': [], 'error': '缺少视频路径' }) continue video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else "" print(f"\n--- 处理第{i}/{len(video_infos)}个视频: {os.path.basename(video_path)} {video_info_str} ---") if not os.path.exists(video_path): print(f"❌ 视频文件不存在: {video_path}") results.append({ 'video_id': video_id, 'video_name': video_name, 'original_path': video_path, 'processed_path': None, 'has_blacklist_match': False, 'detection_count': 0, 'total_frames_processed': 0, 'blacklist_matches': [], 'error': '文件不存在' }) continue # 处理单个视频 result = self._process_single_video_with_detection(video_path, frame_skip, suffix) if result and result['processed_path']: # 添加视频ID和名称到结果中 result['video_id'] = video_id result['video_name'] = video_name results.append(result) status = "✅ 检测到黑名单" if result['has_blacklist_match'] else "⚠️ 未检测到黑名单" video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else "" print(f"{status}: {os.path.basename(result['processed_path'])} (匹配次数: {result['detection_count']}) {video_info_str}") else: error_result = { 'video_id': video_id, 'video_name': video_name, 'original_path': video_path, 'processed_path': None, 'has_blacklist_match': False, 'detection_count': 0, 'total_frames_processed': 0, 'blacklist_matches': [], 'error': '处理失败' } results.append(error_result) video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else "" print(f"❌ 视频处理失败: {os.path.basename(video_path)} {video_info_str}") # 统计结果 success_count = sum(1 for r in results if r.get('processed_path')) blacklist_count = sum(1 for r in results if r.get('has_blacklist_match')) print(f"\n🎉 批量处理完成: 成功{success_count}/{len(video_infos)}个视频") print(f"🔍 黑名单检测结果: {blacklist_count}个视频检测到黑名单中人脸") return results def _process_single_video_with_detection(self, video_path: str, frame_skip: int, suffix: str) -> Optional[Dict]: """ 处理单个视频,进行黑名单检测并保存结果 只保留处理的帧,帧率控制在15-23帧之间 参数: video_path: 视频文件路径 frame_skip: 跳帧数 suffix: 输出文件后缀 返回: 处理结果字典,包含: { 'original_path': 原视频路径, 'processed_path': 处理后的视频路径, 'has_blacklist_match': 是否检测到黑名单中人脸, 'detection_count': 黑名单匹配次数, 'total_frames_processed': 处理的总帧数, 'blacklist_matches': 黑名单匹配详情列表 } """ # 打开输入视频 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 无法打开视频文件: {video_path}") return None # 获取视频信息 original_fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 计算输出视频的帧率 # 目标帧率在15-23帧之间 target_fps = min(max(original_fps / frame_skip, 15), 23) # 创建输出路径 video_dir = os.path.dirname(video_path) video_name = os.path.splitext(os.path.basename(video_path))[0] output_path = os.path.join(video_dir, f"{video_name}{suffix}.webm") # 创建视频写入器,使用计算出的目标帧率 # 使用WebM格式(VP8编码),现代浏览器原生支持 fourcc = cv2.VideoWriter_fourcc(*'VP80') out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height)) if not out.isOpened(): print(f"❌ 无法创建输出文件: {output_path}") cap.release() return None print(f"📹 原视频信息: {width}x{height}, {original_fps:.1f}FPS, {total_frames}帧") print(f"🎯 输出视频帧率: {target_fps:.1f}FPS (目标范围: 15-23FPS)") frame_index = 0 processed_frames = 0 detection_count = 0 processed_frames_list = [] # 记录处理过的帧 while True: ret, frame = cap.read() if not ret: break # 处理帧:每隔frame_skip帧处理一次 if frame_index % frame_skip == 0: processed_frames += 1 # 人脸检测和识别 faces = self.app.get(frame) results = [] for face in faces: # 检查人脸质量 is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) # 查找最佳匹配 best_name, similarity = self.find_best_match(face.embedding) # 黑名单检测:在黑名单中即为匹配 is_match = best_name is not None and similarity >= self.similarity_threshold result = { 'bbox': face.bbox.astype(int).tolist(), 'similarity': similarity, 'best_match': best_name, 'is_match': is_match, 'det_score': float(face.det_score), 'quality_metrics': quality_metrics, 'is_acceptable': is_acceptable } results.append(result) if is_match: detection_count += 1 # 绘制检测结果 if results: frame = self.draw_detections(frame, results) # 只写入处理过的帧 out.write(frame) processed_frames_list.append({ 'frame_index': frame_index, 'has_detection': len(results) > 0, 'has_blacklist_match': any(r['is_match'] for r in results) }) frame_index += 1 # 显示进度 if frame_index % 100 == 0: print(f" 进度: {frame_index}/{total_frames}帧 ({frame_index/total_frames*100:.1f}%)") # 释放资源 cap.release() out.release() # 统计处理结果 frames_with_blacklist = sum(1 for f in processed_frames_list if f['has_blacklist_match']) print(f"📊 处理统计: 处理{processed_frames}帧, 检测到{detection_count}次黑名单匹配") print(f"🎬 输出视频: {len(processed_frames_list)}帧, {target_fps:.1f}FPS") # 返回详细结果 return { 'original_path': video_path, 'processed_path': output_path, 'has_blacklist_match': detection_count > 0, 'detection_count': detection_count, 'total_frames_processed': processed_frames, 'output_fps': target_fps, 'output_frame_count': len(processed_frames_list), 'blacklist_matches': [{ 'frame_index': frame_index, 'best_match': result['best_match'], 'similarity': result['similarity'], 'bbox': result['bbox'] } for result in results if result['is_match']] } def process_video_with_custom_detection(self, video_path: str, frame_skip: int = 10, suffix: str = "_detected", custom_draw_callback=None) -> Optional[str]: """ 处理视频并进行自定义检测(可扩展版本) 参数: video_path: 视频文件路径 frame_skip: 跳帧数 suffix: 输出文件后缀 custom_draw_callback: 自定义绘制回调函数 返回: 处理后的视频路径 """ # 打开输入视频 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 无法打开视频文件: {video_path}") return None # 获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 创建输出路径 video_dir = os.path.dirname(video_path) video_name = os.path.splitext(os.path.basename(video_path))[0] output_path = os.path.join(video_dir, f"{video_name}{suffix}.webm") # 创建视频写入器 # 使用WebM格式(VP8编码),现代浏览器原生支持 fourcc = cv2.VideoWriter_fourcc(*'VP80') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if not out.isOpened(): print(f"❌ 无法创建输出文件: {output_path}") cap.release() return None frame_index = 0 while True: ret, frame = cap.read() if not ret: break # 处理帧:每隔frame_skip帧处理一次 if frame_index % frame_skip == 0: # 人脸检测和识别 faces = self.app.get(frame) results = [] for face in faces: # 检查人脸质量 is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) # 查找最佳匹配 best_name, similarity = self.find_best_match(face.embedding) # 根据名单模式判断是否匹配 if self.list_mode == "0": is_match = best_name is not None and similarity >= self.similarity_threshold else: # whitelist is_match = best_name is not None and similarity >= self.similarity_threshold result = { 'bbox': face.bbox.astype(int).tolist(), 'similarity': similarity, 'best_match': best_name, 'is_match': is_match, 'det_score': float(face.det_score), 'quality_metrics': quality_metrics, 'is_acceptable': is_acceptable } results.append(result) # 使用自定义绘制或默认绘制 if custom_draw_callback: frame = custom_draw_callback(frame, results, self) else: frame = self.draw_detections(frame, results) # 写入处理后的帧 out.write(frame) frame_index += 1 # 释放资源 cap.release() out.release() print(f"✅ 视频处理完成: {output_path}") return output_path