From 93f7e11f9ee6c47717231431b510e9d9a7625703 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Sun, 21 Dec 2025 12:43:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BB=8E=E4=B8=80=E4=B8=B2?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E4=B8=AD=E6=A0=87=E8=AE=B0=E9=BB=91=E5=90=8D?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/video_check_biz.py | 276 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 275 insertions(+), 1 deletion(-) diff --git a/src/video_check_biz.py b/src/video_check_biz.py index 0520cb7..3dd43f0 100644 --- a/src/video_check_biz.py +++ b/src/video_check_biz.py @@ -26,6 +26,45 @@ class VideoCheckBiz(BaseFaceBiz): face_analysis: 已初始化好的FaceAnalysis实例 """ super().__init__(face_analysis) + + def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray: + """ + 重写绘制检测结果方法 + 只在检测到黑名单匹配时用红色绘制人脸框 + + 参数: + frame: 原始帧图像 + results: 检测结果列表 + + 返回: + 绘制后的帧图像 + """ + for result in results: + # 只在黑名单匹配时绘制 + if result['is_match']: + bbox = result['bbox'] + + # 使用红色绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) + + # 添加简单的匹配信息 + best_match = result['best_match'] + similarity = result['similarity'] + + # 绘制匹配信息 + text = f"{best_match}: {similarity:.3f}" + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0] + + # 绘制文本背景 + cv2.rectangle(frame, (x1, y1 - text_size[1] - 5), + (x1 + text_size[0], y1), (0, 0, 0), -1) + + # 绘制文本 + cv2.putText(frame, text, (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) + + return frame def extract_best_face_from_video(self, video_path: str, frame_skip: int = 10) -> Optional[np.ndarray]: """ @@ -214,4 +253,239 @@ class VideoCheckBiz(BaseFaceBiz): } } - return None \ No newline at end of file + return None + + def batch_process_videos_with_blacklist_detection(self, video_paths: List[str], frame_skip: int = 10, + suffix: str = "_processed") -> List[str]: + """ + 批量处理视频文件,进行黑名单检测并保存结果 + + 参数: + video_paths: 视频文件路径列表 + frame_skip: 跳帧数,每隔多少帧处理一帧 + suffix: 输出文件后缀 + + 返回: + 处理后的视频路径列表 + """ + processed_paths = [] + + # 确保使用黑名单模式 + self.set_list_mode("blacklist") + + print(f"🎯 开始批量处理视频: 共{len(video_paths)}个视频") + print(f"🔍 检测模式: 黑名单模式, 跳帧数: {frame_skip}") + + for i, video_path in enumerate(video_paths, 1): + print(f"\n--- 处理第{i}/{len(video_paths)}个视频: {os.path.basename(video_path)} ---") + + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + continue + + # 处理单个视频 + output_path = self._process_single_video_with_detection(video_path, frame_skip, suffix) + + if output_path: + processed_paths.append(output_path) + print(f"✅ 视频处理完成: {os.path.basename(output_path)}") + else: + print(f"❌ 视频处理失败: {os.path.basename(video_path)}") + + print(f"\n🎉 批量处理完成: 成功{len(processed_paths)}/{len(video_paths)}个视频") + return processed_paths + + def _process_single_video_with_detection(self, video_path: str, frame_skip: int, suffix: str) -> Optional[str]: + """ + 处理单个视频,进行黑名单检测并保存结果 + + 参数: + video_path: 视频文件路径 + frame_skip: 跳帧数 + suffix: 输出文件后缀 + + 返回: + 处理后的视频路径 + """ + # 打开输入视频 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return None + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + # 创建输出路径 + video_dir = os.path.dirname(video_path) + video_name = os.path.splitext(os.path.basename(video_path))[0] + output_path = os.path.join(video_dir, f"{video_name}{suffix}.mp4") + + # 创建视频写入器 + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + + if not out.isOpened(): + print(f"❌ 无法创建输出文件: {output_path}") + cap.release() + return None + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, {total_frames}帧") + + frame_index = 0 + processed_frames = 0 + detection_count = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + # 处理帧:每隔frame_skip帧处理一次 + if frame_index % frame_skip == 0: + processed_frames += 1 + + # 人脸检测和识别 + faces = self.app.get(frame) + results = [] + + for face in faces: + # 检查人脸质量 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) + + # 查找最佳匹配 + best_name, similarity = self.find_best_match(face.embedding) + + # 黑名单检测:在黑名单中即为匹配 + is_match = best_name is not None and similarity >= self.similarity_threshold + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'best_match': best_name, + 'is_match': is_match, + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics, + 'is_acceptable': is_acceptable + } + results.append(result) + + if is_match: + detection_count += 1 + + # 绘制检测结果 + if results: + frame = self.draw_detections(frame, results) + + # 写入处理后的帧 + out.write(frame) + frame_index += 1 + + # 显示进度 + if frame_index % 100 == 0: + print(f" 进度: {frame_index}/{total_frames}帧 ({frame_index/total_frames*100:.1f}%)") + + # 释放资源 + cap.release() + out.release() + + print(f"📊 处理统计: 处理{processed_frames}帧, 检测到{detection_count}次黑名单匹配") + + return output_path + + def process_video_with_custom_detection(self, video_path: str, frame_skip: int = 10, + suffix: str = "_detected", + custom_draw_callback=None) -> Optional[str]: + """ + 处理视频并进行自定义检测(可扩展版本) + + 参数: + video_path: 视频文件路径 + frame_skip: 跳帧数 + suffix: 输出文件后缀 + custom_draw_callback: 自定义绘制回调函数 + + 返回: + 处理后的视频路径 + """ + # 打开输入视频 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return None + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # 创建输出路径 + video_dir = os.path.dirname(video_path) + video_name = os.path.splitext(os.path.basename(video_path))[0] + output_path = os.path.join(video_dir, f"{video_name}{suffix}.mp4") + + # 创建视频写入器 + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + + if not out.isOpened(): + print(f"❌ 无法创建输出文件: {output_path}") + cap.release() + return None + + frame_index = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + # 处理帧:每隔frame_skip帧处理一次 + if frame_index % frame_skip == 0: + # 人脸检测和识别 + faces = self.app.get(frame) + results = [] + + for face in faces: + # 检查人脸质量 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) + + # 查找最佳匹配 + best_name, similarity = self.find_best_match(face.embedding) + + # 根据名单模式判断是否匹配 + if self.list_mode == "blacklist": + is_match = best_name is not None and similarity >= self.similarity_threshold + else: # whitelist + is_match = best_name is not None and similarity >= self.similarity_threshold + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'best_match': best_name, + 'is_match': is_match, + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics, + 'is_acceptable': is_acceptable + } + results.append(result) + + # 使用自定义绘制或默认绘制 + if custom_draw_callback: + frame = custom_draw_callback(frame, results, self) + else: + frame = self.draw_detections(frame, results) + + # 写入处理后的帧 + out.write(frame) + frame_index += 1 + + # 释放资源 + cap.release() + out.release() + + print(f"✅ 视频处理完成: {output_path}") + return output_path \ No newline at end of file