Files
SupervisorAI/src/video_face_recognition.py

340 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple
import os
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=0.3,
det_size=(640, 640)
)
self.target_embedding = None
self.target_id = None
self.similarity_threshold = 0.3
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_target_face(self, image_path: str, person_id: str = "target") -> bool:
"""设置目标人脸"""
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取目标图像: {image_path}")
return False
faces = self.app.get(img)
if not faces:
print(f"❌ 目标图像中未检测到人脸: {image_path}")
return False
self.target_embedding = faces[0].embedding
self.target_id = person_id
print(f"✅ 目标人脸设置: {person_id}")
return True
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
similarity = 0.0
if self.target_embedding is not None:
# 计算相似度
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score)
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
# 选择颜色
if is_match:
color = (0, 255, 0) # 绿色 - 匹配
# elif similarity > self.similarity_threshold/2:
# color = (0, 255, 255) # 黄色 - 中等相似度
else:
color = (0, 0, 255) # 红色 - 不匹配
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 绘制信息文本
if self.target_id:
status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
# text = f"{self.target_id}: {status}"
text = status
else:
text = f"Similarity: {similarity:.3f}"
# 文本背景
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
cv2.rectangle(frame, (x1, y1 - text_size[1] - 10), (x1 + text_size[0], y1), color, -1)
# 文本
cv2.putText(frame, text, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 详细信息
info_text = f"{result['gender']}/{result['age']}"
cv2.putText(frame, info_text, (x1, y2 + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return frame
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
# 设置输出视频
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps/(skip_frames), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print("🎥 开始摄像头实时识别 (按 'q' 退出)...")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置目标人脸(可选)
# target_image = "test_data/register/person1.png"
target_image = "test_data/register/sy.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
# 选择处理模式
print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1";
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_1.mp4"
# 性能优化:跳帧处理
skip_frames = 1 # 每2帧处理1帧提高速度
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4" # 可选:保存录制
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()