603 lines
23 KiB
Python
603 lines
23 KiB
Python
"""
|
||
视频检查业务类
|
||
继承自BaseFaceBiz,专门处理视频中的人脸特征提取
|
||
"""
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from typing import Optional, List, Dict
|
||
import os
|
||
from insightface.app import FaceAnalysis
|
||
|
||
from biz.base_face_biz import BaseFaceBiz
|
||
|
||
|
||
class VideoCheckBiz(BaseFaceBiz):
|
||
"""
|
||
视频检查业务类
|
||
专门处理视频中的人脸特征提取和质量评估
|
||
"""
|
||
|
||
def __init__(self, face_analysis: FaceAnalysis):
|
||
"""
|
||
初始化视频检查业务类
|
||
|
||
参数:
|
||
face_analysis: 已初始化好的FaceAnalysis实例
|
||
"""
|
||
super().__init__(face_analysis)
|
||
|
||
def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray:
|
||
"""
|
||
重写绘制检测结果方法
|
||
只在检测到黑名单匹配时用红色绘制人脸框
|
||
|
||
参数:
|
||
frame: 原始帧图像
|
||
results: 检测结果列表
|
||
|
||
返回:
|
||
绘制后的帧图像
|
||
"""
|
||
for result in results:
|
||
# 只在黑名单匹配时绘制
|
||
if result['is_match']:
|
||
bbox = result['bbox']
|
||
|
||
# 使用红色绘制人脸框
|
||
x1, y1, x2, y2 = bbox
|
||
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
|
||
|
||
# 添加简单的匹配信息
|
||
best_match = result['best_match']
|
||
similarity = result['similarity']
|
||
|
||
# 绘制匹配信息
|
||
text = f"{best_match}: {similarity:.3f}"
|
||
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
|
||
|
||
# 绘制文本背景
|
||
cv2.rectangle(frame, (x1, y1 - text_size[1] - 5),
|
||
(x1 + text_size[0], y1), (0, 0, 0), -1)
|
||
|
||
# 绘制文本
|
||
cv2.putText(frame, text, (x1, y1 - 5),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
|
||
|
||
return frame
|
||
|
||
def extract_best_face_from_video(self, video_path: str, frame_skip: int = 10) -> Optional[np.ndarray]:
|
||
"""
|
||
从视频中提取最佳人脸特征
|
||
|
||
参数:
|
||
video_path: 视频文件路径
|
||
frame_skip: 跳帧数,每隔多少帧取一帧
|
||
|
||
返回:
|
||
最佳人脸的特征向量,如果未找到合适的人脸则返回None
|
||
"""
|
||
if not os.path.exists(video_path):
|
||
print(f"❌ 视频文件不存在: {video_path}")
|
||
return None
|
||
|
||
# 打开视频文件
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
print(f"❌ 无法打开视频文件: {video_path}")
|
||
return None
|
||
|
||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
|
||
print(f"📹 视频信息: {total_frames}帧, {fps:.1f}FPS")
|
||
print(f"🔍 跳帧设置: 每{frame_skip}帧取一帧")
|
||
|
||
best_quality_score = -1.0
|
||
best_feature = None
|
||
best_frame_info = None
|
||
processed_frames = 0
|
||
valid_frames = 0
|
||
|
||
frame_index = 0
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 跳过指定帧数
|
||
if frame_index % frame_skip != 0:
|
||
frame_index += 1
|
||
continue
|
||
|
||
processed_frames += 1
|
||
|
||
# 人脸检测
|
||
faces = self.app.get(frame)
|
||
|
||
# 检查人脸数量
|
||
if len(faces) == 0:
|
||
# 无人脸,跳过
|
||
frame_index += 1
|
||
continue
|
||
elif len(faces) > 1:
|
||
# 超过1个人脸,舍弃该帧
|
||
frame_index += 1
|
||
continue
|
||
|
||
# 只有1个人脸
|
||
face = faces[0]
|
||
|
||
# 检查人脸质量
|
||
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
|
||
|
||
if not is_acceptable:
|
||
# 质量不可接受,跳过
|
||
frame_index += 1
|
||
continue
|
||
|
||
valid_frames += 1
|
||
|
||
# 计算综合质量得分
|
||
quality_score = self._calculate_face_quality_score(face, quality_metrics)
|
||
|
||
# 更新最佳人脸
|
||
if quality_score > best_quality_score:
|
||
best_quality_score = quality_score
|
||
best_feature = face.embedding
|
||
best_frame_info = {
|
||
'frame_index': frame_index,
|
||
'quality_score': quality_score,
|
||
'quality_metrics': quality_metrics
|
||
}
|
||
|
||
frame_index += 1
|
||
|
||
cap.release()
|
||
|
||
# 输出结果统计
|
||
print(f"📊 处理统计: 共处理{processed_frames}帧, 有效帧{valid_frames}帧")
|
||
|
||
if best_feature is not None:
|
||
print(f"✅ 找到最佳人脸: 帧{best_frame_info['frame_index']}, 质量得分: {best_quality_score:.3f}")
|
||
print(f" 检测得分: {best_frame_info['quality_metrics']['det_score']:.3f}")
|
||
print(f" 清晰度: {best_frame_info['quality_metrics']['clarity_score']:.1f}")
|
||
print(f" 姿态角度: pitch={best_frame_info['quality_metrics']['pitch']:.1f}°, yaw={best_frame_info['quality_metrics']['yaw']:.1f}°")
|
||
print(f" 人脸尺寸: {best_frame_info['quality_metrics']['bbox_width']}x{best_frame_info['quality_metrics']['bbox_height']}")
|
||
else:
|
||
print("❌ 未找到合适的人脸")
|
||
|
||
return best_feature
|
||
|
||
def _calculate_face_quality_score(self, face, quality_metrics: Dict) -> float:
|
||
"""
|
||
计算人脸的综合质量得分
|
||
|
||
参数:
|
||
face: 人脸检测结果
|
||
quality_metrics: 质量指标字典
|
||
|
||
返回:
|
||
综合质量得分 (0-1)
|
||
"""
|
||
# 基础得分:检测置信度
|
||
base_score = quality_metrics['det_score']
|
||
|
||
# 清晰度得分 (归一化到0-1)
|
||
clarity_score = min(quality_metrics['clarity_score'] / self.clarity_threshold, 1.0)
|
||
|
||
# 姿态得分 (基于角度偏差)
|
||
pitch_score = 1.0 - min(abs(quality_metrics['pitch']) / self.pitch_threshold, 1.0)
|
||
yaw_score = 1.0 - min(abs(quality_metrics['yaw']) / self.yaw_threshold, 1.0)
|
||
|
||
# 尺寸得分 (基于最小尺寸要求)
|
||
min_dim = min(quality_metrics['bbox_width'], quality_metrics['bbox_height'])
|
||
size_score = min(min_dim / self.min_face_size, 1.0)
|
||
|
||
# 综合得分权重
|
||
weights = {
|
||
'base': 0.3, # 检测置信度
|
||
'clarity': 0.3, # 清晰度
|
||
'pose': 0.2, # 姿态
|
||
'size': 0.2 # 尺寸
|
||
}
|
||
|
||
# 计算综合得分
|
||
pose_score = (pitch_score + yaw_score) / 2
|
||
|
||
total_score = (
|
||
base_score * weights['base'] +
|
||
clarity_score * weights['clarity'] +
|
||
pose_score * weights['pose'] +
|
||
size_score * weights['size']
|
||
)
|
||
|
||
return total_score
|
||
|
||
def extract_best_face_from_video_with_details(self, video_path: str, frame_skip: int = 10) -> Optional[Dict]:
|
||
"""
|
||
从视频中提取最佳人脸特征(带详细信息)
|
||
|
||
参数:
|
||
video_path: 视频文件路径
|
||
frame_skip: 跳帧数
|
||
|
||
返回:
|
||
包含特征向量和详细信息的字典,如果未找到合适的人脸则返回None
|
||
"""
|
||
best_feature = self.extract_best_face_from_video(video_path, frame_skip)
|
||
|
||
if best_feature is not None:
|
||
# 重新处理视频获取详细信息
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
return {
|
||
'feature': best_feature,
|
||
'frame_info': None,
|
||
'video_info': {
|
||
'path': video_path,
|
||
'frame_skip': frame_skip
|
||
}
|
||
}
|
||
|
||
# 这里可以添加更多详细信息的提取逻辑
|
||
cap.release()
|
||
|
||
return {
|
||
'feature': best_feature,
|
||
'frame_info': {
|
||
'frame_skip': frame_skip
|
||
},
|
||
'video_info': {
|
||
'path': video_path
|
||
}
|
||
}
|
||
|
||
return None
|
||
|
||
def batch_process_videos_with_blacklist_detection(self, video_infos: List[Dict], frame_skip: int = 10,
|
||
suffix: str = "_processed") -> List[Dict]:
|
||
"""
|
||
批量处理视频文件,进行黑名单检测并保存结果
|
||
|
||
参数:
|
||
video_infos: 视频信息列表,每个元素包含:
|
||
{
|
||
'video_id': 视频ID (可选),
|
||
'video_name': 视频名称 (可选),
|
||
'video_path': 视频文件路径 (必需)
|
||
}
|
||
frame_skip: 跳帧数,每隔多少帧处理一帧
|
||
suffix: 输出文件后缀
|
||
|
||
返回:
|
||
处理结果列表,每个元素包含:
|
||
{
|
||
'video_id': 视频ID,
|
||
'video_name': 视频名称,
|
||
'original_path': 原视频路径,
|
||
'processed_path': 处理后的视频路径,
|
||
'has_blacklist_match': 是否检测到黑名单中人脸,
|
||
'detection_count': 黑名单匹配次数,
|
||
'total_frames_processed': 处理的总帧数,
|
||
'blacklist_matches': 黑名单匹配详情列表
|
||
}
|
||
"""
|
||
results = []
|
||
|
||
# 确保使用黑名单模式
|
||
self.set_list_mode("0")
|
||
|
||
print(f"🎯 开始批量处理视频: 共{len(video_infos)}个视频")
|
||
print(f"🔍 检测模式: 黑名单模式, 跳帧数: {frame_skip}")
|
||
|
||
for i, video_info in enumerate(video_infos, 1):
|
||
video_id = video_info.get('video_id')
|
||
video_name = video_info.get('video_name')
|
||
video_path = video_info.get('video_path')
|
||
|
||
if not video_path:
|
||
print(f"❌ 视频信息缺少路径: {video_info}")
|
||
results.append({
|
||
'video_id': video_id,
|
||
'video_name': video_name,
|
||
'original_path': None,
|
||
'processed_path': None,
|
||
'has_blacklist_match': False,
|
||
'detection_count': 0,
|
||
'total_frames_processed': 0,
|
||
'blacklist_matches': [],
|
||
'error': '缺少视频路径'
|
||
})
|
||
continue
|
||
|
||
video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else ""
|
||
print(f"\n--- 处理第{i}/{len(video_infos)}个视频: {os.path.basename(video_path)} {video_info_str} ---")
|
||
|
||
if not os.path.exists(video_path):
|
||
print(f"❌ 视频文件不存在: {video_path}")
|
||
results.append({
|
||
'video_id': video_id,
|
||
'video_name': video_name,
|
||
'original_path': video_path,
|
||
'processed_path': None,
|
||
'has_blacklist_match': False,
|
||
'detection_count': 0,
|
||
'total_frames_processed': 0,
|
||
'blacklist_matches': [],
|
||
'error': '文件不存在'
|
||
})
|
||
continue
|
||
|
||
# 处理单个视频
|
||
result = self._process_single_video_with_detection(video_path, frame_skip, suffix)
|
||
|
||
if result and result['processed_path']:
|
||
# 添加视频ID和名称到结果中
|
||
result['video_id'] = video_id
|
||
result['video_name'] = video_name
|
||
results.append(result)
|
||
status = "✅ 检测到黑名单" if result['has_blacklist_match'] else "⚠️ 未检测到黑名单"
|
||
video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else ""
|
||
print(f"{status}: {os.path.basename(result['processed_path'])} (匹配次数: {result['detection_count']}) {video_info_str}")
|
||
else:
|
||
error_result = {
|
||
'video_id': video_id,
|
||
'video_name': video_name,
|
||
'original_path': video_path,
|
||
'processed_path': None,
|
||
'has_blacklist_match': False,
|
||
'detection_count': 0,
|
||
'total_frames_processed': 0,
|
||
'blacklist_matches': [],
|
||
'error': '处理失败'
|
||
}
|
||
results.append(error_result)
|
||
video_info_str = f"(ID: {video_id}, 名称: {video_name})" if video_id else ""
|
||
print(f"❌ 视频处理失败: {os.path.basename(video_path)} {video_info_str}")
|
||
|
||
# 统计结果
|
||
success_count = sum(1 for r in results if r.get('processed_path'))
|
||
blacklist_count = sum(1 for r in results if r.get('has_blacklist_match'))
|
||
|
||
print(f"\n🎉 批量处理完成: 成功{success_count}/{len(video_infos)}个视频")
|
||
print(f"🔍 黑名单检测结果: {blacklist_count}个视频检测到黑名单中人脸")
|
||
|
||
return results
|
||
|
||
def _process_single_video_with_detection(self, video_path: str, frame_skip: int, suffix: str) -> Optional[Dict]:
|
||
"""
|
||
处理单个视频,进行黑名单检测并保存结果
|
||
只保留处理的帧,帧率控制在15-23帧之间
|
||
|
||
参数:
|
||
video_path: 视频文件路径
|
||
frame_skip: 跳帧数
|
||
suffix: 输出文件后缀
|
||
|
||
返回:
|
||
处理结果字典,包含:
|
||
{
|
||
'original_path': 原视频路径,
|
||
'processed_path': 处理后的视频路径,
|
||
'has_blacklist_match': 是否检测到黑名单中人脸,
|
||
'detection_count': 黑名单匹配次数,
|
||
'total_frames_processed': 处理的总帧数,
|
||
'blacklist_matches': 黑名单匹配详情列表
|
||
}
|
||
"""
|
||
# 打开输入视频
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
print(f"❌ 无法打开视频文件: {video_path}")
|
||
return None
|
||
|
||
# 获取视频信息
|
||
original_fps = cap.get(cv2.CAP_PROP_FPS)
|
||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
||
# 计算输出视频的帧率
|
||
# 目标帧率在15-23帧之间
|
||
target_fps = min(max(original_fps / frame_skip, 15), 23)
|
||
|
||
# 创建输出路径
|
||
video_dir = os.path.dirname(video_path)
|
||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||
output_path = os.path.join(video_dir, f"{video_name}{suffix}.webm")
|
||
|
||
# 创建视频写入器,使用计算出的目标帧率
|
||
# 使用WebM格式(VP8编码),现代浏览器原生支持
|
||
fourcc = cv2.VideoWriter_fourcc(*'VP80')
|
||
out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height))
|
||
|
||
if not out.isOpened():
|
||
print(f"❌ 无法创建输出文件: {output_path}")
|
||
cap.release()
|
||
return None
|
||
|
||
print(f"📹 原视频信息: {width}x{height}, {original_fps:.1f}FPS, {total_frames}帧")
|
||
print(f"🎯 输出视频帧率: {target_fps:.1f}FPS (目标范围: 15-23FPS)")
|
||
|
||
frame_index = 0
|
||
processed_frames = 0
|
||
detection_count = 0
|
||
processed_frames_list = [] # 记录处理过的帧
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 处理帧:每隔frame_skip帧处理一次
|
||
if frame_index % frame_skip == 0:
|
||
processed_frames += 1
|
||
|
||
# 人脸检测和识别
|
||
faces = self.app.get(frame)
|
||
results = []
|
||
|
||
for face in faces:
|
||
# 检查人脸质量
|
||
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
|
||
|
||
# 查找最佳匹配
|
||
best_name, similarity = self.find_best_match(face.embedding)
|
||
|
||
# 黑名单检测:在黑名单中即为匹配
|
||
is_match = best_name is not None and similarity >= self.similarity_threshold
|
||
|
||
result = {
|
||
'bbox': face.bbox.astype(int).tolist(),
|
||
'similarity': similarity,
|
||
'best_match': best_name,
|
||
'is_match': is_match,
|
||
'det_score': float(face.det_score),
|
||
'quality_metrics': quality_metrics,
|
||
'is_acceptable': is_acceptable
|
||
}
|
||
results.append(result)
|
||
|
||
if is_match:
|
||
detection_count += 1
|
||
|
||
# 绘制检测结果
|
||
if results:
|
||
frame = self.draw_detections(frame, results)
|
||
|
||
# 只写入处理过的帧
|
||
out.write(frame)
|
||
processed_frames_list.append({
|
||
'frame_index': frame_index,
|
||
'has_detection': len(results) > 0,
|
||
'has_blacklist_match': any(r['is_match'] for r in results)
|
||
})
|
||
|
||
frame_index += 1
|
||
|
||
# 显示进度
|
||
if frame_index % 100 == 0:
|
||
print(f" 进度: {frame_index}/{total_frames}帧 ({frame_index/total_frames*100:.1f}%)")
|
||
|
||
# 释放资源
|
||
cap.release()
|
||
out.release()
|
||
|
||
# 统计处理结果
|
||
frames_with_blacklist = sum(1 for f in processed_frames_list if f['has_blacklist_match'])
|
||
|
||
print(f"📊 处理统计: 处理{processed_frames}帧, 检测到{detection_count}次黑名单匹配")
|
||
print(f"🎬 输出视频: {len(processed_frames_list)}帧, {target_fps:.1f}FPS")
|
||
|
||
# 返回详细结果
|
||
return {
|
||
'original_path': video_path,
|
||
'processed_path': output_path,
|
||
'has_blacklist_match': detection_count > 0,
|
||
'detection_count': detection_count,
|
||
'total_frames_processed': processed_frames,
|
||
'output_fps': target_fps,
|
||
'output_frame_count': len(processed_frames_list),
|
||
'blacklist_matches': [{
|
||
'frame_index': frame_index,
|
||
'best_match': result['best_match'],
|
||
'similarity': result['similarity'],
|
||
'bbox': result['bbox']
|
||
} for result in results if result['is_match']]
|
||
}
|
||
|
||
def process_video_with_custom_detection(self, video_path: str, frame_skip: int = 10,
|
||
suffix: str = "_detected",
|
||
custom_draw_callback=None) -> Optional[str]:
|
||
"""
|
||
处理视频并进行自定义检测(可扩展版本)
|
||
|
||
参数:
|
||
video_path: 视频文件路径
|
||
frame_skip: 跳帧数
|
||
suffix: 输出文件后缀
|
||
custom_draw_callback: 自定义绘制回调函数
|
||
|
||
返回:
|
||
处理后的视频路径
|
||
"""
|
||
# 打开输入视频
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
print(f"❌ 无法打开视频文件: {video_path}")
|
||
return None
|
||
|
||
# 获取视频信息
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
|
||
# 创建输出路径
|
||
video_dir = os.path.dirname(video_path)
|
||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||
output_path = os.path.join(video_dir, f"{video_name}{suffix}.webm")
|
||
|
||
# 创建视频写入器
|
||
# 使用WebM格式(VP8编码),现代浏览器原生支持
|
||
fourcc = cv2.VideoWriter_fourcc(*'VP80')
|
||
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
||
|
||
if not out.isOpened():
|
||
print(f"❌ 无法创建输出文件: {output_path}")
|
||
cap.release()
|
||
return None
|
||
|
||
frame_index = 0
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 处理帧:每隔frame_skip帧处理一次
|
||
if frame_index % frame_skip == 0:
|
||
# 人脸检测和识别
|
||
faces = self.app.get(frame)
|
||
results = []
|
||
|
||
for face in faces:
|
||
# 检查人脸质量
|
||
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
|
||
|
||
# 查找最佳匹配
|
||
best_name, similarity = self.find_best_match(face.embedding)
|
||
|
||
# 根据名单模式判断是否匹配
|
||
if self.list_mode == "0":
|
||
is_match = best_name is not None and similarity >= self.similarity_threshold
|
||
else: # whitelist
|
||
is_match = best_name is not None and similarity >= self.similarity_threshold
|
||
|
||
result = {
|
||
'bbox': face.bbox.astype(int).tolist(),
|
||
'similarity': similarity,
|
||
'best_match': best_name,
|
||
'is_match': is_match,
|
||
'det_score': float(face.det_score),
|
||
'quality_metrics': quality_metrics,
|
||
'is_acceptable': is_acceptable
|
||
}
|
||
results.append(result)
|
||
|
||
# 使用自定义绘制或默认绘制
|
||
if custom_draw_callback:
|
||
frame = custom_draw_callback(frame, results, self)
|
||
else:
|
||
frame = self.draw_detections(frame, results)
|
||
|
||
# 写入处理后的帧
|
||
out.write(frame)
|
||
frame_index += 1
|
||
|
||
# 释放资源
|
||
cap.release()
|
||
out.release()
|
||
|
||
print(f"✅ 视频处理完成: {output_path}")
|
||
return output_path |