Files
SupervisorAI/backup/video_face_recognition_2.py
2025-12-20 18:07:49 +08:00

466 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple
import os
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=0.3,
det_size=(640, 640)
)
self.target_embedding = None
self.target_id = None
self.similarity_threshold = 0.3
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_target_face(self, image_path: str, person_id: str = "target") -> bool:
"""设置目标人脸"""
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取目标图像: {image_path}")
return False
faces = self.app.get(img)
if not faces:
print(f"❌ 目标图像中未检测到人脸: {image_path}")
return False
self.target_embedding = faces[0].embedding
self.target_id = person_id
print(f"✅ 目标人脸设置: {person_id}")
return True
def calculate_face_quality(self, face) -> Dict:
"""
计算人脸质量指标
"""
quality_metrics = {}
# 1. 检测置信度
quality_metrics['det_score'] = float(face.det_score)
# 2. 人脸姿态角度 (pitch, yaw, roll)
if hasattr(face, 'pose') and face.pose is not None:
pitch, yaw, roll = face.pose
quality_metrics['pitch'] = float(pitch) # 俯仰角
quality_metrics['yaw'] = float(yaw) # 偏航角
quality_metrics['roll'] = float(roll) # 翻滚角
else:
quality_metrics['pitch'] = 0.0
quality_metrics['yaw'] = 0.0
quality_metrics['roll'] = 0.0
# 3. 人脸边界框信息
bbox = face.bbox
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
quality_metrics['bbox_area'] = width * height
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
# 4. 关键点质量评估 (基于关键点分布)
if hasattr(face, 'kps') and face.kps is not None:
kps = face.kps
# 计算关键点分布的均匀性
if len(kps) >= 5:
# 计算关键点之间的平均距离
distances = []
for i in range(len(kps)):
for j in range(i + 1, len(kps)):
dist = np.linalg.norm(kps[i] - kps[j])
distances.append(dist)
if distances:
quality_metrics['kps_variance'] = float(np.var(distances))
else:
quality_metrics['kps_variance'] = 0.0
else:
quality_metrics['kps_variance'] = 0.0
else:
quality_metrics['kps_variance'] = 0.0
# 5. 综合质量评分
# 基于检测得分、姿态角度、边界框大小等因素
base_score = quality_metrics['det_score']
# 姿态惩罚 - 角度越大质量分越低
pose_penalty = 0.0
if abs(quality_metrics['yaw']) > 30: # 偏航角大于30度惩罚
pose_penalty += 0.2
if abs(quality_metrics['pitch']) > 20: # 俯仰角大于20度惩罚
pose_penalty += 0.2
quality_metrics['quality_score'] = max(0.1, base_score - pose_penalty)
return quality_metrics
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
similarity = 0.0
if self.target_embedding is not None:
# 计算相似度
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
# 计算人脸质量指标
quality_metrics = self.calculate_face_quality(face)
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score),
'quality_metrics': quality_metrics # 添加质量指标
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果和质量信息"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
quality_metrics = result['quality_metrics']
# 选择颜色
if is_match:
color = (0, 255, 0) # 绿色 - 匹配
else:
color = (0, 0, 255) # 红色 - 不匹配
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 准备显示文本
text_lines = []
# 第一行:匹配状态和相似度
if self.target_id:
status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
text_lines.append(status)
else:
text_lines.append(f"Similarity: {similarity:.3f}")
# 第二行:基础信息
text_lines.append(f"{result['gender']}/{result['age']}")
# 第三行:质量得分和检测得分
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
# 第四行:姿态角度
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
# 第五行:其他质量指标
text_lines.append(f"Area: {quality_metrics['bbox_area']:.0f}")
text_lines.append(f"Aspect: {quality_metrics['aspect_ratio']:.2f}")
# 计算文本区域大小
max_text_width = 0
total_text_height = 0
line_heights = []
for line in text_lines:
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
max_text_width = max(max_text_width, text_width)
line_heights.append(text_height + baseline)
total_text_height += text_height + baseline + 2 # 2像素行间距
# 绘制文本背景
bg_x1 = x1
bg_y1 = y1 - total_text_height - 10
bg_x2 = x1 + max_text_width + 10
bg_y2 = y1
# 如果背景超出图像顶部,调整到框下方
if bg_y1 < 0:
bg_y1 = y2
bg_y2 = y2 + total_text_height + 10
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
alpha = 0.6 # 透明度
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
# 绘制文本
current_y = bg_y1 + 15
for i, line in enumerate(text_lines):
# 根据内容选择颜色
if i == 0: # 匹配状态行
text_color = (0, 255, 0) if is_match else (0, 0, 255)
elif i in [2, 3]: # 质量得分行
# 根据质量得分调整颜色
quality = quality_metrics['quality_score']
if quality > 0.7:
text_color = (0, 255, 0) # 绿色 - 高质量
elif quality > 0.4:
text_color = (0, 255, 255) # 黄色 - 中等质量
else:
text_color = (0, 0, 255) # 红色 - 低质量
elif i in [4, 5, 6]: # 姿态角度行
# 根据角度大小调整颜色
if abs(quality_metrics['yaw']) > 45 or abs(quality_metrics['pitch']) > 30:
text_color = (0, 0, 255) # 红色 - 角度过大
elif abs(quality_metrics['yaw']) > 30 or abs(quality_metrics['pitch']) > 20:
text_color = (0, 255, 255) # 黄色 - 角度偏大
else:
text_color = (0, 255, 0) # 绿色 - 角度良好
else:
text_color = (255, 255, 255) # 白色 - 普通信息
cv2.putText(frame, line, (x1 + 5, current_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
current_y += line_heights[i]
return frame
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
# 设置输出视频
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print("🎥 开始摄像头实时识别 (按 'q' 退出)...")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置目标人脸(可选)
target_image = "test_data/register/sy.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
# 选择处理模式
print("请选择处理模式:")
print("1. 处理视频文件")
print("2. 实时摄像头")
choice = input("请输入选择 (1 或 2): ").strip()
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_1_quality.mp4"
# 性能优化:跳帧处理
skip_frames = 1 # 每2帧处理1帧提高速度
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4" # 可选:保存录制
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()