275 lines
8.7 KiB
Python
275 lines
8.7 KiB
Python
# video_face_recognition_cann.py
|
||
import cv2
|
||
import numpy as np
|
||
import time
|
||
import os
|
||
from algorithm.face_recognition_algorithm import FaceRecognitionAlgorithm
|
||
|
||
|
||
def process_video_file(algorithm: FaceRecognitionAlgorithm, video_path: str, output_path: str = None,
|
||
skip_frames: int = 0, show_preview: bool = True):
|
||
"""
|
||
处理视频文件
|
||
|
||
Args:
|
||
algorithm: FaceRecognitionAlgorithm实例
|
||
video_path: 输入视频路径
|
||
output_path: 输出视频路径
|
||
skip_frames: 跳帧数,用于提高处理速度
|
||
show_preview: 是否显示实时预览
|
||
"""
|
||
if not os.path.exists(video_path):
|
||
print(f"❌ 视频文件不存在: {video_path}")
|
||
return
|
||
|
||
# 打开视频文件
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
print(f"❌ 无法打开视频文件: {video_path}")
|
||
return
|
||
|
||
# 获取视频信息
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
||
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
|
||
print(f"🎯 当前模式: {algorithm.get_list_mode()}, 注册人脸数: {algorithm.get_registered_face_count()}")
|
||
|
||
# 设置输出视频
|
||
if output_path:
|
||
# 确保输出目录存在
|
||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
|
||
else:
|
||
out = None
|
||
|
||
# 性能统计
|
||
frame_count = 0
|
||
processed_frames = 0
|
||
processing_times = []
|
||
start_time = time.time()
|
||
|
||
print("🚀 开始处理视频...")
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 跳帧处理
|
||
if skip_frames > 0 and frame_count % (skip_frames + 1) != 0:
|
||
frame_count += 1
|
||
continue
|
||
|
||
# 处理当前帧 - 获取结果
|
||
processed_frame, results, processing_time = algorithm.process_frame(frame)
|
||
print(f"face process time : {processing_time}")
|
||
|
||
# 在帧上绘制检测结果
|
||
processed_frame = algorithm.draw_detections(processed_frame, results)
|
||
|
||
# 记录处理时间
|
||
processing_times.append(processing_time)
|
||
|
||
# 写入输出视频
|
||
if out:
|
||
out.write(processed_frame)
|
||
|
||
# 显示预览
|
||
if show_preview:
|
||
# 添加性能信息
|
||
fps_text = f"Frame: {frame_count}/{total_frames} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}"
|
||
cv2.putText(processed_frame, fps_text, (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||
|
||
# 添加名单统计
|
||
match_count = sum(1 for r in results if r['is_match'])
|
||
list_text = f"Match: {match_count}/{len(results)}"
|
||
cv2.putText(processed_frame, list_text, (10, 60),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||
|
||
cv2.imshow('Video Face Recognition', processed_frame)
|
||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||
break
|
||
|
||
frame_count += 1
|
||
processed_frames += 1
|
||
|
||
# 进度显示
|
||
if frame_count % 30 == 0:
|
||
progress = (frame_count / total_frames) * 100
|
||
print(f"📊 处理进度: {progress:.1f}% ({frame_count}/{total_frames})")
|
||
|
||
# 清理资源
|
||
cap.release()
|
||
if out:
|
||
out.release()
|
||
if show_preview:
|
||
cv2.destroyAllWindows()
|
||
|
||
# 性能统计
|
||
total_time = time.time() - start_time
|
||
avg_processing_time = np.mean(processing_times) if processing_times else 0
|
||
|
||
print(f"\n🎉 视频处理完成!")
|
||
print(f"📊 性能统计:")
|
||
print(f" 总处理帧数: {processed_frames}")
|
||
print(f" 总耗时: {total_time:.1f}秒")
|
||
print(f" 平均每帧: {avg_processing_time:.1f}ms")
|
||
print(f" 实际FPS: {processed_frames / total_time:.1f}")
|
||
if output_path:
|
||
print(f" 输出视频: {output_path}")
|
||
|
||
|
||
def process_webcam(algorithm: FaceRecognitionAlgorithm, camera_id: int = 0, output_path: str = None):
|
||
"""
|
||
处理摄像头实时视频流
|
||
"""
|
||
cap = cv2.VideoCapture(camera_id)
|
||
if not cap.isOpened():
|
||
print(f"❌ 无法打开摄像头 {camera_id}")
|
||
return
|
||
|
||
# 设置摄像头分辨率(可选)
|
||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
|
||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
|
||
|
||
# 设置输出视频
|
||
if output_path:
|
||
# 确保输出目录存在
|
||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
||
else:
|
||
out = None
|
||
|
||
# 性能统计
|
||
processing_times = []
|
||
|
||
print(f"🎥 开始摄像头实时识别 - 模式: {algorithm.get_list_mode()} (按 'q' 退出)...")
|
||
print(f"📋 注册人脸数: {algorithm.get_registered_face_count()}")
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
print("❌ 无法读取摄像头帧")
|
||
break
|
||
|
||
# 处理当前帧 - 获取结果
|
||
processed_frame, results, processing_time = algorithm.process_frame(frame)
|
||
print(f"face process time : {processing_time}")
|
||
# 在帧上绘制检测结果
|
||
processed_frame = algorithm.draw_detections(processed_frame, results)
|
||
|
||
# 记录处理时间
|
||
processing_times.append(processing_time)
|
||
|
||
# 添加实时信息
|
||
current_fps = 1000 / processing_times[-1] if processing_times else 0
|
||
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}"
|
||
cv2.putText(processed_frame, info_text, (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||
|
||
# 添加名单统计
|
||
match_count = sum(1 for r in results if r['is_match'])
|
||
list_text = f"Match: {match_count}/{len(results)}"
|
||
cv2.putText(processed_frame, list_text, (10, 60),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||
|
||
# 写入输出
|
||
if out:
|
||
out.write(processed_frame)
|
||
|
||
# 显示预览
|
||
cv2.imshow('Real-time Face Recognition', processed_frame)
|
||
|
||
# 按'q'退出
|
||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||
break
|
||
|
||
# 清理资源
|
||
cap.release()
|
||
if out:
|
||
out.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
print("✅ 摄像头处理结束")
|
||
|
||
|
||
# 使用示例
|
||
def main():
|
||
# 直接创建人脸识别算法实例
|
||
# 使用NPU:
|
||
# algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=True, npu_device_id=0)
|
||
# 使用GPU:
|
||
algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False)
|
||
# 使用CPU:
|
||
# algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=False)
|
||
# algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False) # 默认使用GPU
|
||
|
||
# 设置名单模式
|
||
# algorithm.set_list_mode("blacklist") # 黑名单模式
|
||
# algorithm.set_list_mode("whitelist") # 白名单模式
|
||
|
||
# 加载注册人脸
|
||
register_dir = "test_data/register" # 注册图片目录
|
||
if os.path.exists(register_dir):
|
||
algorithm.load_registered_faces(register_dir)
|
||
else:
|
||
print(f"⚠️ 注册目录不存在: {register_dir}")
|
||
#
|
||
# # 设置质量阈值(可根据实际情况调整)
|
||
# algorithm.set_quality_thresholds(
|
||
# clarity_threshold=1000.0, # 清晰度阈值
|
||
# quality_threshold=0.6, # 质量得分阈值
|
||
# min_face_size=30
|
||
# )
|
||
|
||
# # 选择处理模式
|
||
# print("请选择处理模式:")
|
||
# print("1. 处理视频文件")
|
||
# print("2. 实时摄像头")
|
||
#
|
||
# choice = input("请输入选择 (1 或 2): ").strip()
|
||
|
||
choice = "1"
|
||
|
||
if choice == "1":
|
||
# 处理视频文件
|
||
video_path = "test_data/video/video_2.mp4"
|
||
output_path = "test_data/output_video/video_2_white_9_gpu.mp4"
|
||
# output_path = "test_data/output_video/video_2_black_2.mp4"
|
||
|
||
# 性能优化:跳帧处理
|
||
skip_frames = 2
|
||
|
||
process_video_file(
|
||
algorithm=algorithm,
|
||
video_path=video_path,
|
||
output_path=output_path,
|
||
skip_frames=skip_frames,
|
||
show_preview=False
|
||
)
|
||
|
||
elif choice == "2":
|
||
# 实时摄像头
|
||
output_path = "webcam_recording.mp4"
|
||
|
||
process_webcam(
|
||
algorithm=algorithm,
|
||
camera_id=0,
|
||
output_path=output_path
|
||
)
|
||
|
||
else:
|
||
print("❌ 无效选择")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |