完成视频人脸检测,输出视频文件

This commit is contained in:
zqc
2025-11-11 11:20:02 +08:00
parent ecfeb1fd6c
commit 3dfe075080
2 changed files with 341 additions and 1 deletions

View File

@@ -39,7 +39,7 @@ def comprehensive_gpu_test():
cv2.rectangle(test_image, (100, 100), (200, 200), (255, 255, 255), -1)
cv2.rectangle(test_image, (300, 150), (400, 250), (255, 255, 255), -1)
cv2.rectangle(test_image, (500, 200), (600, 300), (255, 255, 255), -1)
cv2.imwrite("test_data/multi_face_test.jpg", test_image)
# cv2.imwrite("test_data/multi_face_test.jpg", test_image)
print("✅ 测试数据创建完成")

View File

@@ -0,0 +1,340 @@
# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple
import os
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=0.3,
det_size=(640, 640)
)
self.target_embedding = None
self.target_id = None
self.similarity_threshold = 0.3
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_target_face(self, image_path: str, person_id: str = "target") -> bool:
"""设置目标人脸"""
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取目标图像: {image_path}")
return False
faces = self.app.get(img)
if not faces:
print(f"❌ 目标图像中未检测到人脸: {image_path}")
return False
self.target_embedding = faces[0].embedding
self.target_id = person_id
print(f"✅ 目标人脸设置: {person_id}")
return True
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
similarity = 0.0
if self.target_embedding is not None:
# 计算相似度
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score)
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
# 选择颜色
if is_match:
color = (0, 255, 0) # 绿色 - 匹配
# elif similarity > self.similarity_threshold/2:
# color = (0, 255, 255) # 黄色 - 中等相似度
else:
color = (0, 0, 255) # 红色 - 不匹配
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 绘制信息文本
if self.target_id:
status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
# text = f"{self.target_id}: {status}"
text = status
else:
text = f"Similarity: {similarity:.3f}"
# 文本背景
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
cv2.rectangle(frame, (x1, y1 - text_size[1] - 10), (x1 + text_size[0], y1), color, -1)
# 文本
cv2.putText(frame, text, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 详细信息
info_text = f"{result['gender']}/{result['age']}"
cv2.putText(frame, info_text, (x1, y2 + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return frame
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
# 设置输出视频
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps/(skip_frames), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print("🎥 开始摄像头实时识别 (按 'q' 退出)...")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置目标人脸(可选)
target_image = "test_data/register/person1.png"
# target_image = "test_data/register/cjh.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
# 选择处理模式
print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1";
if choice == "1":
# 处理视频文件
video_path = "test_data/video/test_video.mp4"
output_path = "test_data/output_video/zqc.mp4"
# 性能优化:跳帧处理
skip_frames = 1 # 每2帧处理1帧提高速度
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4" # 可选:保存录制
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()