Files
SupervisorAI/src/video_check_biz.py

491 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频检查业务类
继承自BaseFaceBiz专门处理视频中的人脸特征提取
"""
import cv2
import numpy as np
from typing import Optional, List, Dict, Tuple
import os
from insightface.app import FaceAnalysis
from src.base_face_biz import BaseFaceBiz
class VideoCheckBiz(BaseFaceBiz):
"""
视频检查业务类
专门处理视频中的人脸特征提取和质量评估
"""
def __init__(self, face_analysis: FaceAnalysis):
"""
初始化视频检查业务类
参数:
face_analysis: 已初始化好的FaceAnalysis实例
"""
super().__init__(face_analysis)
def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray:
"""
重写绘制检测结果方法
只在检测到黑名单匹配时用红色绘制人脸框
参数:
frame: 原始帧图像
results: 检测结果列表
返回:
绘制后的帧图像
"""
for result in results:
# 只在黑名单匹配时绘制
if result['is_match']:
bbox = result['bbox']
# 使用红色绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
# 添加简单的匹配信息
best_match = result['best_match']
similarity = result['similarity']
# 绘制匹配信息
text = f"{best_match}: {similarity:.3f}"
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
# 绘制文本背景
cv2.rectangle(frame, (x1, y1 - text_size[1] - 5),
(x1 + text_size[0], y1), (0, 0, 0), -1)
# 绘制文本
cv2.putText(frame, text, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
return frame
def extract_best_face_from_video(self, video_path: str, frame_skip: int = 10) -> Optional[np.ndarray]:
"""
从视频中提取最佳人脸特征
参数:
video_path: 视频文件路径
frame_skip: 跳帧数,每隔多少帧取一帧
返回:
最佳人脸的特征向量如果未找到合适的人脸则返回None
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return None
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return None
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"📹 视频信息: {total_frames}帧, {fps:.1f}FPS")
print(f"🔍 跳帧设置: 每{frame_skip}帧取一帧")
best_quality_score = -1.0
best_feature = None
best_frame_info = None
processed_frames = 0
valid_frames = 0
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 跳过指定帧数
if frame_index % frame_skip != 0:
frame_index += 1
continue
processed_frames += 1
# 人脸检测
faces = self.app.get(frame)
# 检查人脸数量
if len(faces) == 0:
# 无人脸,跳过
frame_index += 1
continue
elif len(faces) > 1:
# 超过1个人脸舍弃该帧
frame_index += 1
continue
# 只有1个人脸
face = faces[0]
# 检查人脸质量
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
if not is_acceptable:
# 质量不可接受,跳过
frame_index += 1
continue
valid_frames += 1
# 计算综合质量得分
quality_score = self._calculate_face_quality_score(face, quality_metrics)
# 更新最佳人脸
if quality_score > best_quality_score:
best_quality_score = quality_score
best_feature = face.embedding
best_frame_info = {
'frame_index': frame_index,
'quality_score': quality_score,
'quality_metrics': quality_metrics
}
frame_index += 1
cap.release()
# 输出结果统计
print(f"📊 处理统计: 共处理{processed_frames}帧, 有效帧{valid_frames}")
if best_feature is not None:
print(f"✅ 找到最佳人脸: 帧{best_frame_info['frame_index']}, 质量得分: {best_quality_score:.3f}")
print(f" 检测得分: {best_frame_info['quality_metrics']['det_score']:.3f}")
print(f" 清晰度: {best_frame_info['quality_metrics']['clarity_score']:.1f}")
print(f" 姿态角度: pitch={best_frame_info['quality_metrics']['pitch']:.1f}°, yaw={best_frame_info['quality_metrics']['yaw']:.1f}°")
print(f" 人脸尺寸: {best_frame_info['quality_metrics']['bbox_width']}x{best_frame_info['quality_metrics']['bbox_height']}")
else:
print("❌ 未找到合适的人脸")
return best_feature
def _calculate_face_quality_score(self, face, quality_metrics: Dict) -> float:
"""
计算人脸的综合质量得分
参数:
face: 人脸检测结果
quality_metrics: 质量指标字典
返回:
综合质量得分 (0-1)
"""
# 基础得分:检测置信度
base_score = quality_metrics['det_score']
# 清晰度得分 (归一化到0-1)
clarity_score = min(quality_metrics['clarity_score'] / self.clarity_threshold, 1.0)
# 姿态得分 (基于角度偏差)
pitch_score = 1.0 - min(abs(quality_metrics['pitch']) / self.pitch_threshold, 1.0)
yaw_score = 1.0 - min(abs(quality_metrics['yaw']) / self.yaw_threshold, 1.0)
# 尺寸得分 (基于最小尺寸要求)
min_dim = min(quality_metrics['bbox_width'], quality_metrics['bbox_height'])
size_score = min(min_dim / self.min_face_size, 1.0)
# 综合得分权重
weights = {
'base': 0.3, # 检测置信度
'clarity': 0.3, # 清晰度
'pose': 0.2, # 姿态
'size': 0.2 # 尺寸
}
# 计算综合得分
pose_score = (pitch_score + yaw_score) / 2
total_score = (
base_score * weights['base'] +
clarity_score * weights['clarity'] +
pose_score * weights['pose'] +
size_score * weights['size']
)
return total_score
def extract_best_face_from_video_with_details(self, video_path: str, frame_skip: int = 10) -> Optional[Dict]:
"""
从视频中提取最佳人脸特征(带详细信息)
参数:
video_path: 视频文件路径
frame_skip: 跳帧数
返回:
包含特征向量和详细信息的字典如果未找到合适的人脸则返回None
"""
best_feature = self.extract_best_face_from_video(video_path, frame_skip)
if best_feature is not None:
# 重新处理视频获取详细信息
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return {
'feature': best_feature,
'frame_info': None,
'video_info': {
'path': video_path,
'frame_skip': frame_skip
}
}
# 这里可以添加更多详细信息的提取逻辑
cap.release()
return {
'feature': best_feature,
'frame_info': {
'frame_skip': frame_skip
},
'video_info': {
'path': video_path
}
}
return None
def batch_process_videos_with_blacklist_detection(self, video_paths: List[str], frame_skip: int = 10,
suffix: str = "_processed") -> List[str]:
"""
批量处理视频文件,进行黑名单检测并保存结果
参数:
video_paths: 视频文件路径列表
frame_skip: 跳帧数,每隔多少帧处理一帧
suffix: 输出文件后缀
返回:
处理后的视频路径列表
"""
processed_paths = []
# 确保使用黑名单模式
self.set_list_mode("blacklist")
print(f"🎯 开始批量处理视频: 共{len(video_paths)}个视频")
print(f"🔍 检测模式: 黑名单模式, 跳帧数: {frame_skip}")
for i, video_path in enumerate(video_paths, 1):
print(f"\n--- 处理第{i}/{len(video_paths)}个视频: {os.path.basename(video_path)} ---")
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
continue
# 处理单个视频
output_path = self._process_single_video_with_detection(video_path, frame_skip, suffix)
if output_path:
processed_paths.append(output_path)
print(f"✅ 视频处理完成: {os.path.basename(output_path)}")
else:
print(f"❌ 视频处理失败: {os.path.basename(video_path)}")
print(f"\n🎉 批量处理完成: 成功{len(processed_paths)}/{len(video_paths)}个视频")
return processed_paths
def _process_single_video_with_detection(self, video_path: str, frame_skip: int, suffix: str) -> Optional[str]:
"""
处理单个视频,进行黑名单检测并保存结果
参数:
video_path: 视频文件路径
frame_skip: 跳帧数
suffix: 输出文件后缀
返回:
处理后的视频路径
"""
# 打开输入视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return None
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出路径
video_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = os.path.join(video_dir, f"{video_name}{suffix}.mp4")
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
print(f"❌ 无法创建输出文件: {output_path}")
cap.release()
return None
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, {total_frames}")
frame_index = 0
processed_frames = 0
detection_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 处理帧每隔frame_skip帧处理一次
if frame_index % frame_skip == 0:
processed_frames += 1
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 黑名单检测:在黑名单中即为匹配
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable
}
results.append(result)
if is_match:
detection_count += 1
# 绘制检测结果
if results:
frame = self.draw_detections(frame, results)
# 写入处理后的帧
out.write(frame)
frame_index += 1
# 显示进度
if frame_index % 100 == 0:
print(f" 进度: {frame_index}/{total_frames}帧 ({frame_index/total_frames*100:.1f}%)")
# 释放资源
cap.release()
out.release()
print(f"📊 处理统计: 处理{processed_frames}帧, 检测到{detection_count}次黑名单匹配")
return output_path
def process_video_with_custom_detection(self, video_path: str, frame_skip: int = 10,
suffix: str = "_detected",
custom_draw_callback=None) -> Optional[str]:
"""
处理视频并进行自定义检测(可扩展版本)
参数:
video_path: 视频文件路径
frame_skip: 跳帧数
suffix: 输出文件后缀
custom_draw_callback: 自定义绘制回调函数
返回:
处理后的视频路径
"""
# 打开输入视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return None
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 创建输出路径
video_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = os.path.join(video_dir, f"{video_name}{suffix}.mp4")
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
print(f"❌ 无法创建输出文件: {output_path}")
cap.release()
return None
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 处理帧每隔frame_skip帧处理一次
if frame_index % frame_skip == 0:
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 根据名单模式判断是否匹配
if self.list_mode == "blacklist":
is_match = best_name is not None and similarity >= self.similarity_threshold
else: # whitelist
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable
}
results.append(result)
# 使用自定义绘制或默认绘制
if custom_draw_callback:
frame = custom_draw_callback(frame, results, self)
else:
frame = self.draw_detections(frame, results)
# 写入处理后的帧
out.write(frame)
frame_index += 1
# 释放资源
cap.release()
out.release()
print(f"✅ 视频处理完成: {output_path}")
return output_path