Files
SupervisorAI/video_face_recognition_cann_3.py

275 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# video_face_recognition_cann.py
import cv2
import numpy as np
import time
import os
from algorithm.face_recognition_algorithm import FaceRecognitionAlgorithm
def process_video_file(algorithm: FaceRecognitionAlgorithm, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
algorithm: FaceRecognitionAlgorithm实例
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
print(f"🎯 当前模式: {algorithm.get_list_mode()}, 注册人脸数: {algorithm.get_registered_face_count()}")
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
else:
out = None
# 性能统计
frame_count = 0
processed_frames = 0
processing_times = []
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_count % (skip_frames + 1) != 0:
frame_count += 1
continue
# 处理当前帧 - 获取结果
processed_frame, results, processing_time = algorithm.process_frame(frame)
print(f"face process time : {processing_time}")
# 在帧上绘制检测结果
processed_frame = algorithm.draw_detections(processed_frame, results)
# 记录处理时间
processing_times.append(processing_time)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_count}/{total_frames} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
processed_frames += 1
# 进度显示
if frame_count % 30 == 0:
progress = (frame_count / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_count}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(processing_times) if processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(algorithm: FaceRecognitionAlgorithm, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
# 性能统计
processing_times = []
print(f"🎥 开始摄像头实时识别 - 模式: {algorithm.get_list_mode()} (按 'q' 退出)...")
print(f"📋 注册人脸数: {algorithm.get_registered_face_count()}")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧 - 获取结果
processed_frame, results, processing_time = algorithm.process_frame(frame)
print(f"face process time : {processing_time}")
# 在帧上绘制检测结果
processed_frame = algorithm.draw_detections(processed_frame, results)
# 记录处理时间
processing_times.append(processing_time)
# 添加实时信息
current_fps = 1000 / processing_times[-1] if processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 直接创建人脸识别算法实例
# 使用NPU
# algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=True, npu_device_id=0)
# 使用GPU
algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False)
# 使用CPU
# algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=False)
# algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False) # 默认使用GPU
# 设置名单模式
# algorithm.set_list_mode("blacklist") # 黑名单模式
# algorithm.set_list_mode("whitelist") # 白名单模式
# 加载注册人脸
register_dir = "test_data/register" # 注册图片目录
if os.path.exists(register_dir):
algorithm.load_registered_faces(register_dir)
else:
print(f"⚠️ 注册目录不存在: {register_dir}")
#
# # 设置质量阈值(可根据实际情况调整)
# algorithm.set_quality_thresholds(
# clarity_threshold=1000.0, # 清晰度阈值
# quality_threshold=0.6, # 质量得分阈值
# min_face_size=30
# )
# # 选择处理模式
# print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
#
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1"
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_2.mp4"
output_path = "test_data/output_video/video_2_white_9_gpu.mp4"
# output_path = "test_data/output_video/video_2_black_2.mp4"
# 性能优化:跳帧处理
skip_frames = 2
process_video_file(
algorithm=algorithm,
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=False
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4"
process_webcam(
algorithm=algorithm,
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()