diff --git a/src/cuda_t3.py b/src/cuda_t3.py index 93438f8..2c95232 100644 --- a/src/cuda_t3.py +++ b/src/cuda_t3.py @@ -39,7 +39,7 @@ def comprehensive_gpu_test(): cv2.rectangle(test_image, (100, 100), (200, 200), (255, 255, 255), -1) cv2.rectangle(test_image, (300, 150), (400, 250), (255, 255, 255), -1) cv2.rectangle(test_image, (500, 200), (600, 300), (255, 255, 255), -1) - cv2.imwrite("test_data/multi_face_test.jpg", test_image) + # cv2.imwrite("test_data/multi_face_test.jpg", test_image) print("✅ 测试数据创建完成") diff --git a/src/video_face_recognition.py b/src/video_face_recognition.py new file mode 100644 index 0000000..61729f7 --- /dev/null +++ b/src/video_face_recognition.py @@ -0,0 +1,340 @@ +# video_face_recognition.py +import cv2 +import numpy as np +import time +from insightface.app import FaceAnalysis +from typing import List, Dict, Tuple +import os + + +class VideoFaceRecognition: + """ + 视频人脸识别系统 + 支持实时视频流和视频文件处理 + """ + + def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True): + # 初始化人脸识别模型 + self.app = FaceAnalysis(name=model_name) + self.app.prepare( + ctx_id=0 if use_gpu else -1, + det_thresh=0.3, + det_size=(640, 640) + ) + + self.target_embedding = None + self.target_id = None + self.similarity_threshold = 0.3 + + # 性能统计 + self.frame_count = 0 + self.processing_times = [] + + print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}") + + def set_target_face(self, image_path: str, person_id: str = "target") -> bool: + """设置目标人脸""" + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取目标图像: {image_path}") + return False + + faces = self.app.get(img) + if not faces: + print(f"❌ 目标图像中未检测到人脸: {image_path}") + return False + + self.target_embedding = faces[0].embedding + self.target_id = person_id + print(f"✅ 目标人脸设置: {person_id}") + return True + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: + """ + 处理单帧图像 + 返回: (处理后的帧, 识别结果列表) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + similarity = 0.0 + if self.target_embedding is not None: + # 计算相似度 + emb1 = face.embedding / np.linalg.norm(face.embedding) + emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) + similarity = float(np.dot(emb1, emb2)) + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'is_match': similarity >= self.similarity_threshold, + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score) + } + results.append(result) + + # 在帧上绘制结果 + frame = self._draw_detection(frame, result) + + # 性能统计 + processing_time = (time.time() - start_time) * 1000 + self.processing_times.append(processing_time) + self.frame_count += 1 + + return frame, results + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + + # 选择颜色 + if is_match: + color = (0, 255, 0) # 绿色 - 匹配 + # elif similarity > self.similarity_threshold/2: + # color = (0, 255, 255) # 黄色 - 中等相似度 + else: + color = (0, 0, 255) # 红色 - 不匹配 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 绘制信息文本 + if self.target_id: + status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" + # text = f"{self.target_id}: {status}" + text = status + else: + text = f"Similarity: {similarity:.3f}" + + # 文本背景 + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] + cv2.rectangle(frame, (x1, y1 - text_size[1] - 10), (x1 + text_size[0], y1), color, -1) + + # 文本 + cv2.putText(frame, text, (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) + + # 详细信息 + info_text = f"{result['gender']}/{result['age']}" + cv2.putText(frame, info_text, (x1, y2 + 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + return frame + + def process_video_file(self, video_path: str, output_path: str = None, + skip_frames: int = 0, show_preview: bool = True): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径 + skip_frames: 跳帧数,用于提高处理速度 + show_preview: 是否显示实时预览 + """ + if not os.path.exists(video_path): + print(f"❌ 视频文件不存在: {video_path}") + return + + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"❌ 无法打开视频文件: {video_path}") + return + + # 获取视频信息 + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}") + + # 设置输出视频 + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps/(skip_frames), (width, height)) + else: + out = None + + # 处理视频帧 + frame_index = 0 + processed_frames = 0 + start_time = time.time() + + print("🚀 开始处理视频...") + + while True: + ret, frame = cap.read() + if not ret: + break + + # 跳帧处理 + if skip_frames > 0 and frame_index % (skip_frames + 1) != 0: + frame_index += 1 + continue + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 写入输出视频 + if out: + out.write(processed_frame) + + # 显示预览 + if show_preview: + # 添加性能信息 + fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}" + cv2.putText(processed_frame, fps_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Video Face Recognition', processed_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_index += 1 + processed_frames += 1 + + # 进度显示 + if frame_index % 30 == 0: + progress = (frame_index / total_frames) * 100 + print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})") + + # 清理资源 + cap.release() + if out: + out.release() + if show_preview: + cv2.destroyAllWindows() + + # 性能统计 + total_time = time.time() - start_time + avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0 + + print(f"\n🎉 视频处理完成!") + print(f"📊 性能统计:") + print(f" 总处理帧数: {processed_frames}") + print(f" 总耗时: {total_time:.1f}秒") + print(f" 平均每帧: {avg_processing_time:.1f}ms") + print(f" 实际FPS: {processed_frames / total_time:.1f}") + if output_path: + print(f" 输出视频: {output_path}") + + def process_webcam(self, camera_id: int = 0, output_path: str = None): + """ + 处理摄像头实时视频流 + """ + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + print(f"❌ 无法打开摄像头 {camera_id}") + return + + # 设置摄像头分辨率(可选) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + # 设置输出视频 + if output_path: + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + else: + out = None + + print("🎥 开始摄像头实时识别 (按 'q' 退出)...") + + while True: + ret, frame = cap.read() + if not ret: + print("❌ 无法读取摄像头帧") + break + + # 处理当前帧 + processed_frame, results = self.process_frame(frame) + + # 添加实时信息 + current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0 + info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}" + cv2.putText(processed_frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 写入输出 + if out: + out.write(processed_frame) + + # 显示预览 + cv2.imshow('Real-time Face Recognition', processed_frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 清理资源 + cap.release() + if out: + out.release() + cv2.destroyAllWindows() + + print("✅ 摄像头处理结束") + + +# 使用示例 +def main(): + # 创建视频识别系统 + video_system = VideoFaceRecognition(use_gpu=True) + + # 设置目标人脸(可选) + target_image = "test_data/register/person1.png" + # target_image = "test_data/register/cjh.jpg" + + if os.path.exists(target_image): + video_system.set_target_face(target_image, "目标人物") + + # 选择处理模式 + print("请选择处理模式:") + # print("1. 处理视频文件") + # print("2. 实时摄像头") + + # choice = input("请输入选择 (1 或 2): ").strip() + choice = "1"; + + if choice == "1": + # 处理视频文件 + video_path = "test_data/video/test_video.mp4" + output_path = "test_data/output_video/zqc.mp4" + + # 性能优化:跳帧处理 + skip_frames = 1 # 每2帧处理1帧,提高速度 + + video_system.process_video_file( + video_path=video_path, + output_path=output_path, + skip_frames=skip_frames, + show_preview=True + ) + + elif choice == "2": + # 实时摄像头 + output_path = "webcam_recording.mp4" # 可选:保存录制 + + video_system.process_webcam( + camera_id=0, + output_path=output_path + ) + + else: + print("❌ 无效选择") + + +if __name__ == "__main__": + main() \ No newline at end of file