# video_face_recognition_cann.py import cv2 import numpy as np import time import os from algorithm.face_recognition_algorithm import FaceRecognitionAlgorithm def process_video_file(algorithm: FaceRecognitionAlgorithm, video_path: str, output_path: str = None, skip_frames: int = 0, show_preview: bool = True): """ 处理视频文件 Args: algorithm: FaceRecognitionAlgorithm实例 video_path: 输入视频路径 output_path: 输出视频路径 skip_frames: 跳帧数,用于提高处理速度 show_preview: 是否显示实时预览 """ if not os.path.exists(video_path): print(f"❌ 视频文件不存在: {video_path}") return # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 无法打开视频文件: {video_path}") return # 获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") print(f"🎯 当前模式: {algorithm.get_list_mode()}, 注册人脸数: {algorithm.get_registered_face_count()}") # 设置输出视频 if output_path: # 确保输出目录存在 os.makedirs(os.path.dirname(output_path), exist_ok=True) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height)) else: out = None # 性能统计 frame_count = 0 processed_frames = 0 processing_times = [] start_time = time.time() print("🚀 开始处理视频...") while True: ret, frame = cap.read() if not ret: break # 跳帧处理 if skip_frames > 0 and frame_count % (skip_frames + 1) != 0: frame_count += 1 continue # 处理当前帧 - 获取结果 processed_frame, results, processing_time = algorithm.process_frame(frame) print(f"face process time : {processing_time}") # 在帧上绘制检测结果 processed_frame = algorithm.draw_detections(processed_frame, results) # 记录处理时间 processing_times.append(processing_time) # 写入输出视频 if out: out.write(processed_frame) # 显示预览 if show_preview: # 添加性能信息 fps_text = f"Frame: {frame_count}/{total_frames} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}" cv2.putText(processed_frame, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # 添加名单统计 match_count = sum(1 for r in results if r['is_match']) list_text = f"Match: {match_count}/{len(results)}" cv2.putText(processed_frame, list_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) cv2.imshow('Video Face Recognition', processed_frame) if cv2.waitKey(1) & 0xFF == ord('q'): break frame_count += 1 processed_frames += 1 # 进度显示 if frame_count % 30 == 0: progress = (frame_count / total_frames) * 100 print(f"📊 处理进度: {progress:.1f}% ({frame_count}/{total_frames})") # 清理资源 cap.release() if out: out.release() if show_preview: cv2.destroyAllWindows() # 性能统计 total_time = time.time() - start_time avg_processing_time = np.mean(processing_times) if processing_times else 0 print(f"\n🎉 视频处理完成!") print(f"📊 性能统计:") print(f" 总处理帧数: {processed_frames}") print(f" 总耗时: {total_time:.1f}秒") print(f" 平均每帧: {avg_processing_time:.1f}ms") print(f" 实际FPS: {processed_frames / total_time:.1f}") if output_path: print(f" 输出视频: {output_path}") def process_webcam(algorithm: FaceRecognitionAlgorithm, camera_id: int = 0, output_path: str = None): """ 处理摄像头实时视频流 """ cap = cv2.VideoCapture(camera_id) if not cap.isOpened(): print(f"❌ 无法打开摄像头 {camera_id}") return # 设置摄像头分辨率(可选) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # 设置输出视频 if output_path: # 确保输出目录存在 os.makedirs(os.path.dirname(output_path), exist_ok=True) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) else: out = None # 性能统计 processing_times = [] print(f"🎥 开始摄像头实时识别 - 模式: {algorithm.get_list_mode()} (按 'q' 退出)...") print(f"📋 注册人脸数: {algorithm.get_registered_face_count()}") while True: ret, frame = cap.read() if not ret: print("❌ 无法读取摄像头帧") break # 处理当前帧 - 获取结果 processed_frame, results, processing_time = algorithm.process_frame(frame) print(f"face process time : {processing_time}") # 在帧上绘制检测结果 processed_frame = algorithm.draw_detections(processed_frame, results) # 记录处理时间 processing_times.append(processing_time) # 添加实时信息 current_fps = 1000 / processing_times[-1] if processing_times else 0 info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}" cv2.putText(processed_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # 添加名单统计 match_count = sum(1 for r in results if r['is_match']) list_text = f"Match: {match_count}/{len(results)}" cv2.putText(processed_frame, list_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # 写入输出 if out: out.write(processed_frame) # 显示预览 cv2.imshow('Real-time Face Recognition', processed_frame) # 按'q'退出 if cv2.waitKey(1) & 0xFF == ord('q'): break # 清理资源 cap.release() if out: out.release() cv2.destroyAllWindows() print("✅ 摄像头处理结束") # 使用示例 def main(): # 直接创建人脸识别算法实例 # 使用NPU: # algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=True, npu_device_id=0) # 使用GPU: algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False) # 使用CPU: # algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=False) # algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False) # 默认使用GPU # 设置名单模式 # algorithm.set_list_mode("blacklist") # 黑名单模式 # algorithm.set_list_mode("whitelist") # 白名单模式 # 加载注册人脸 register_dir = "test_data/register" # 注册图片目录 if os.path.exists(register_dir): algorithm.load_registered_faces(register_dir) else: print(f"⚠️ 注册目录不存在: {register_dir}") # # # 设置质量阈值(可根据实际情况调整) # algorithm.set_quality_thresholds( # clarity_threshold=1000.0, # 清晰度阈值 # quality_threshold=0.6, # 质量得分阈值 # min_face_size=30 # ) # # 选择处理模式 # print("请选择处理模式:") # print("1. 处理视频文件") # print("2. 实时摄像头") # # choice = input("请输入选择 (1 或 2): ").strip() choice = "1" if choice == "1": # 处理视频文件 video_path = "test_data/video/video_2.mp4" output_path = "test_data/output_video/video_2_white_9_gpu.mp4" # output_path = "test_data/output_video/video_2_black_2.mp4" # 性能优化:跳帧处理 skip_frames = 2 process_video_file( algorithm=algorithm, video_path=video_path, output_path=output_path, skip_frames=skip_frames, show_preview=False ) elif choice == "2": # 实时摄像头 output_path = "webcam_recording.mp4" process_webcam( algorithm=algorithm, camera_id=0, output_path=output_path ) else: print("❌ 无效选择") if __name__ == "__main__": main()