Files
SupervisorAI/biz/video_check_biz.py

603 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频检查业务类
继承自BaseFaceBiz专门处理视频中的人脸特征提取
"""
import cv2
import numpy as np
from typing import Optional, List, Dict
import os
from insightface.app import FaceAnalysis
from biz.base_face_biz import BaseFaceBiz
class VideoCheckBiz(BaseFaceBiz):
"""
视频检查业务类
专门处理视频中的人脸特征提取和质量评估
"""
def __init__(self, face_analysis: FaceAnalysis):
"""
初始化视频检查业务类
参数:
face_analysis: 已初始化好的FaceAnalysis实例
"""
super().__init__(face_analysis)
def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray:
"""
重写绘制检测结果方法
只在检测到黑名单匹配时用红色绘制人脸框
参数:
frame: 原始帧图像
results: 检测结果列表
返回:
绘制后的帧图像
"""
for result in results:
# 只在黑名单匹配时绘制
if result['is_match']:
bbox = result['bbox']
# 使用红色绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
# 添加简单的匹配信息
best_match = result['best_match']
similarity = result['similarity']
# 绘制匹配信息
text = f"{best_match}: {similarity:.3f}"
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
# 绘制文本背景
cv2.rectangle(frame, (x1, y1 - text_size[1] - 5),
(x1 + text_size[0], y1), (0, 0, 0), -1)
# 绘制文本
cv2.putText(frame, text, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
return frame
def extract_best_face_from_video(self, video_path: str, frame_skip: int = 10) -> Optional[np.ndarray]:
"""
从视频中提取最佳人脸特征
参数:
video_path: 视频文件路径
frame_skip: 跳帧数,每隔多少帧取一帧
返回:
最佳人脸的特征向量如果未找到合适的人脸则返回None
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return None
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return None
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"📹 视频信息: {total_frames}帧, {fps:.1f}FPS")
print(f"🔍 跳帧设置: 每{frame_skip}帧取一帧")
best_quality_score = -1.0
best_feature = None
best_frame_info = None
processed_frames = 0
valid_frames = 0
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 跳过指定帧数
if frame_index % frame_skip != 0:
frame_index += 1
continue
processed_frames += 1
# 人脸检测
faces = self.app.get(frame)
# 检查人脸数量
if len(faces) == 0:
# 无人脸,跳过
frame_index += 1
continue
elif len(faces) > 1:
# 超过1个人脸舍弃该帧
frame_index += 1
continue
# 只有1个人脸
face = faces[0]
# 检查人脸质量
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
if not is_acceptable:
# 质量不可接受,跳过
frame_index += 1
continue
valid_frames += 1
# 计算综合质量得分
quality_score = self._calculate_face_quality_score(face, quality_metrics)
# 更新最佳人脸
if quality_score > best_quality_score:
best_quality_score = quality_score
best_feature = face.embedding
best_frame_info = {
'frame_index': frame_index,
'quality_score': quality_score,
'quality_metrics': quality_metrics
}
frame_index += 1
cap.release()
# 输出结果统计
print(f"📊 处理统计: 共处理{processed_frames}帧, 有效帧{valid_frames}")
if best_feature is not None:
print(f"✅ 找到最佳人脸: 帧{best_frame_info['frame_index']}, 质量得分: {best_quality_score:.3f}")
print(f" 检测得分: {best_frame_info['quality_metrics']['det_score']:.3f}")
print(f" 清晰度: {best_frame_info['quality_metrics']['clarity_score']:.1f}")
print(f" 姿态角度: pitch={best_frame_info['quality_metrics']['pitch']:.1f}°, yaw={best_frame_info['quality_metrics']['yaw']:.1f}°")
print(f" 人脸尺寸: {best_frame_info['quality_metrics']['bbox_width']}x{best_frame_info['quality_metrics']['bbox_height']}")
else:
print("❌ 未找到合适的人脸")
return best_feature
def _calculate_face_quality_score(self, face, quality_metrics: Dict) -> float:
"""
计算人脸的综合质量得分
参数:
face: 人脸检测结果
quality_metrics: 质量指标字典
返回:
综合质量得分 (0-1)
"""
# 基础得分:检测置信度
base_score = quality_metrics['det_score']
# 清晰度得分 (归一化到0-1)
clarity_score = min(quality_metrics['clarity_score'] / self.clarity_threshold, 1.0)
# 姿态得分 (基于角度偏差)
pitch_score = 1.0 - min(abs(quality_metrics['pitch']) / self.pitch_threshold, 1.0)
yaw_score = 1.0 - min(abs(quality_metrics['yaw']) / self.yaw_threshold, 1.0)
# 尺寸得分 (基于最小尺寸要求)
min_dim = min(quality_metrics['bbox_width'], quality_metrics['bbox_height'])
size_score = min(min_dim / self.min_face_size, 1.0)
# 综合得分权重
weights = {
'base': 0.3, # 检测置信度
'clarity': 0.3, # 清晰度
'pose': 0.2, # 姿态
'size': 0.2 # 尺寸
}
# 计算综合得分
pose_score = (pitch_score + yaw_score) / 2
total_score = (
base_score * weights['base'] +
clarity_score * weights['clarity'] +
pose_score * weights['pose'] +
size_score * weights['size']
)
return total_score
def extract_best_face_from_video_with_details(self, video_path: str, frame_skip: int = 10) -> Optional[Dict]:
"""
从视频中提取最佳人脸特征(带详细信息)
参数:
video_path: 视频文件路径
frame_skip: 跳帧数
返回:
包含特征向量和详细信息的字典如果未找到合适的人脸则返回None
"""
best_feature = self.extract_best_face_from_video(video_path, frame_skip)
if best_feature is not None:
# 重新处理视频获取详细信息
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return {
'feature': best_feature,
'frame_info': None,
'video_info': {
'path': video_path,
'frame_skip': frame_skip
}
}
# 这里可以添加更多详细信息的提取逻辑
cap.release()
return {
'feature': best_feature,
'frame_info': {
'frame_skip': frame_skip
},
'video_info': {
'path': video_path
}
}
return None
def batch_process_videos_with_blacklist_detection(self, video_infos: List[Dict], frame_skip: int = 10,
suffix: str = "_processed") -> List[Dict]:
"""
批量处理视频文件,进行黑名单检测并保存结果
参数:
video_infos: 视频信息列表,每个元素包含:
{
'video_id': 视频ID (可选),
'video_name': 视频名称 (可选),
'video_path': 视频文件路径 (必需)
}
frame_skip: 跳帧数,每隔多少帧处理一帧
suffix: 输出文件后缀
返回:
处理结果列表,每个元素包含:
{
'video_id': 视频ID,
'video_name': 视频名称,
'original_path': 原视频路径,
'processed_path': 处理后的视频路径,
'has_blacklist_match': 是否检测到黑名单中人脸,
'detection_count': 黑名单匹配次数,
'total_frames_processed': 处理的总帧数,
'blacklist_matches': 黑名单匹配详情列表
}
"""
results = []
# 确保使用黑名单模式
self.set_list_mode("0")
print(f"🎯 开始批量处理视频: 共{len(video_infos)}个视频")
print(f"🔍 检测模式: 黑名单模式, 跳帧数: {frame_skip}")
for i, video_info in enumerate(video_infos, 1):
video_id = video_info.get('video_id')
video_name = video_info.get('video_name')
video_path = video_info.get('video_path')
if not video_path:
print(f"❌ 视频信息缺少路径: {video_info}")
results.append({
'video_id': video_id,
'video_name': video_name,
'original_path': None,
'processed_path': None,
'has_blacklist_match': False,
'detection_count': 0,
'total_frames_processed': 0,
'blacklist_matches': [],
'error': '缺少视频路径'
})
continue
video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else ""
print(f"\n--- 处理第{i}/{len(video_infos)}个视频: {os.path.basename(video_path)} {video_info_str} ---")
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
results.append({
'video_id': video_id,
'video_name': video_name,
'original_path': video_path,
'processed_path': None,
'has_blacklist_match': False,
'detection_count': 0,
'total_frames_processed': 0,
'blacklist_matches': [],
'error': '文件不存在'
})
continue
# 处理单个视频
result = self._process_single_video_with_detection(video_path, frame_skip, suffix)
if result and result['processed_path']:
# 添加视频ID和名称到结果中
result['video_id'] = video_id
result['video_name'] = video_name
results.append(result)
status = "✅ 检测到黑名单" if result['has_blacklist_match'] else "⚠️ 未检测到黑名单"
video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else ""
print(f"{status}: {os.path.basename(result['processed_path'])} (匹配次数: {result['detection_count']}) {video_info_str}")
else:
error_result = {
'video_id': video_id,
'video_name': video_name,
'original_path': video_path,
'processed_path': None,
'has_blacklist_match': False,
'detection_count': 0,
'total_frames_processed': 0,
'blacklist_matches': [],
'error': '处理失败'
}
results.append(error_result)
video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else ""
print(f"❌ 视频处理失败: {os.path.basename(video_path)} {video_info_str}")
# 统计结果
success_count = sum(1 for r in results if r.get('processed_path'))
blacklist_count = sum(1 for r in results if r.get('has_blacklist_match'))
print(f"\n🎉 批量处理完成: 成功{success_count}/{len(video_infos)}个视频")
print(f"🔍 黑名单检测结果: {blacklist_count}个视频检测到黑名单中人脸")
return results
def _process_single_video_with_detection(self, video_path: str, frame_skip: int, suffix: str) -> Optional[Dict]:
"""
处理单个视频,进行黑名单检测并保存结果
只保留处理的帧帧率控制在15-23帧之间
参数:
video_path: 视频文件路径
frame_skip: 跳帧数
suffix: 输出文件后缀
返回:
处理结果字典,包含:
{
'original_path': 原视频路径,
'processed_path': 处理后的视频路径,
'has_blacklist_match': 是否检测到黑名单中人脸,
'detection_count': 黑名单匹配次数,
'total_frames_processed': 处理的总帧数,
'blacklist_matches': 黑名单匹配详情列表
}
"""
# 打开输入视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return None
# 获取视频信息
original_fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 计算输出视频的帧率
# 目标帧率在15-23帧之间
target_fps = min(max(original_fps / frame_skip, 15), 23)
# 创建输出路径
video_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = os.path.join(video_dir, f"{video_name}{suffix}.webm")
# 创建视频写入器,使用计算出的目标帧率
# 使用WebM格式VP8编码现代浏览器原生支持
fourcc = cv2.VideoWriter_fourcc(*'VP80')
out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height))
if not out.isOpened():
print(f"❌ 无法创建输出文件: {output_path}")
cap.release()
return None
print(f"📹 原视频信息: {width}x{height}, {original_fps:.1f}FPS, {total_frames}")
print(f"🎯 输出视频帧率: {target_fps:.1f}FPS (目标范围: 15-23FPS)")
frame_index = 0
processed_frames = 0
detection_count = 0
processed_frames_list = [] # 记录处理过的帧
while True:
ret, frame = cap.read()
if not ret:
break
# 处理帧每隔frame_skip帧处理一次
if frame_index % frame_skip == 0:
processed_frames += 1
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 黑名单检测:在黑名单中即为匹配
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable
}
results.append(result)
if is_match:
detection_count += 1
# 绘制检测结果
if results:
frame = self.draw_detections(frame, results)
# 只写入处理过的帧
out.write(frame)
processed_frames_list.append({
'frame_index': frame_index,
'has_detection': len(results) > 0,
'has_blacklist_match': any(r['is_match'] for r in results)
})
frame_index += 1
# 显示进度
if frame_index % 100 == 0:
print(f" 进度: {frame_index}/{total_frames}帧 ({frame_index/total_frames*100:.1f}%)")
# 释放资源
cap.release()
out.release()
# 统计处理结果
frames_with_blacklist = sum(1 for f in processed_frames_list if f['has_blacklist_match'])
print(f"📊 处理统计: 处理{processed_frames}帧, 检测到{detection_count}次黑名单匹配")
print(f"🎬 输出视频: {len(processed_frames_list)}帧, {target_fps:.1f}FPS")
# 返回详细结果
return {
'original_path': video_path,
'processed_path': output_path,
'has_blacklist_match': detection_count > 0,
'detection_count': detection_count,
'total_frames_processed': processed_frames,
'output_fps': target_fps,
'output_frame_count': len(processed_frames_list),
'blacklist_matches': [{
'frame_index': frame_index,
'best_match': result['best_match'],
'similarity': result['similarity'],
'bbox': result['bbox']
} for result in results if result['is_match']]
}
def process_video_with_custom_detection(self, video_path: str, frame_skip: int = 10,
suffix: str = "_detected",
custom_draw_callback=None) -> Optional[str]:
"""
处理视频并进行自定义检测(可扩展版本)
参数:
video_path: 视频文件路径
frame_skip: 跳帧数
suffix: 输出文件后缀
custom_draw_callback: 自定义绘制回调函数
返回:
处理后的视频路径
"""
# 打开输入视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return None
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 创建输出路径
video_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = os.path.join(video_dir, f"{video_name}{suffix}.webm")
# 创建视频写入器
# 使用WebM格式VP8编码现代浏览器原生支持
fourcc = cv2.VideoWriter_fourcc(*'VP80')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
print(f"❌ 无法创建输出文件: {output_path}")
cap.release()
return None
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 处理帧每隔frame_skip帧处理一次
if frame_index % frame_skip == 0:
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 根据名单模式判断是否匹配
if self.list_mode == "0":
is_match = best_name is not None and similarity >= self.similarity_threshold
else: # whitelist
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable
}
results.append(result)
# 使用自定义绘制或默认绘制
if custom_draw_callback:
frame = custom_draw_callback(frame, results, self)
else:
frame = self.draw_detections(frame, results)
# 写入处理后的帧
out.write(frame)
frame_index += 1
# 释放资源
cap.release()
out.release()
print(f"✅ 视频处理完成: {output_path}")
return output_path