diff --git a/src/cuda_t.py b/src/cuda_t.py new file mode 100644 index 0000000..e93c415 --- /dev/null +++ b/src/cuda_t.py @@ -0,0 +1,242 @@ +import torch +import onnxruntime as ort +import insightface +import subprocess +import sys +import os + + +def detailed_diagnosis(): + """详细诊断脚本""" + print("=" * 60) + print("详细CUDA诊断") + print("=" * 60) + + # 1. 系统信息 + print("\n📋 1. 系统信息:") + print(f"Python版本: {sys.version}") + print(f"Python路径: {sys.executable}") + print(f"Conda环境: {sys.prefix}") + + # 2. PyTorch详细信息 + print("\n🔥 2. PyTorch详细信息:") + print(f"PyTorch版本: {torch.__version__}") + print(f"PyTorch路径: {torch.__file__}") + print(f"CUDA可用: {torch.cuda.is_available()}") + + if torch.cuda.is_available(): + print(f"CUDA版本: {torch.version.cuda}") + print(f"GPU数量: {torch.cuda.device_count()}") + for i in range(torch.cuda.device_count()): + print(f" GPU {i}: {torch.cuda.get_device_name(i)}") + print(f" 内存: {torch.cuda.get_device_properties(i).total_memory / 1024 ** 3:.1f} GB") + else: + print("❌ PyTorch无法使用CUDA") + # 检查可能的原因 + print("\n🔍 PyTorch CUDA问题排查:") + print(f" torch.cuda.is_available(): {torch.cuda.is_available()}") + try: + print(f" CUDA设备数量: {torch.cuda.device_count()}") + except: + print(" CUDA设备数量: 无法获取") + + # 3. ONNX Runtime详细信息 + print("\n⚡ 3. ONNX Runtime详细信息:") + print(f"ONNX Runtime版本: {ort.__version__}") + print(f"ONNX Runtime路径: {ort.__file__}") + + available_providers = ort.get_available_providers() + print(f"可用Providers: {available_providers}") + + if 'CUDAExecutionProvider' in available_providers: + print("✅ CUDAExecutionProvider可用") + # 测试CUDA provider + try: + options = ort.SessionOptions() + session = ort.InferenceSession( + os.path.join(os.path.dirname(insightface.__file__), 'models', 'buffalo_l', '1k3d68.onx'), + providers=['CUDAExecutionProvider'], + sess_options=options + ) + print("✅ CUDAExecutionProvider测试通过") + except Exception as e: + print(f"❌ CUDAExecutionProvider测试失败: {e}") + else: + print("❌ CUDAExecutionProvider不可用") + + # 4. InsightFace信息 + print("\n👁️ 4. InsightFace信息:") + print(f"InsightFace版本: {insightface.__version__}") + print(f"InsightFace路径: {insightface.__file__}") + + # 5. 系统CUDA检查 + print("\n🖥️ 5. 系统CUDA检查:") + try: + # 检查nvidia-smi + result = subprocess.run(['nvidia-smi'], capture_output=True, text=True, timeout=10) + if result.returncode == 0: + print("✅ nvidia-smi可用") + # 提取关键信息 + lines = result.stdout.split('\n') + for line in lines: + if 'Driver Version' in line: + print(f" 驱动版本: {line.strip()}") + if 'CUDA Version' in line: + print(f" CUDA版本: {line.strip()}") + else: + print("❌ nvidia-smi不可用") + except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: + print(f"❌ nvidia-smi执行失败: {e}") + + try: + # 检查nvcc + result = subprocess.run(['nvcc', '--version'], capture_output=True, text=True, timeout=5) + if result.returncode == 0: + print("✅ nvcc可用") + version_line = result.stdout.split('\n')[3] if len(result.stdout.split('\n')) > 3 else result.stdout + print(f" {version_line.strip()}") + else: + print("❌ nvcc不可用") + except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: + print(f"❌ nvcc执行失败: {e}") + + # 6. 环境变量检查 + print("\n🌍 6. 环境变量检查:") + cuda_paths = [] + for key, value in os.environ.items(): + if 'CUDA' in key.upper() or 'CUDNN' in key.upper(): + print(f" {key}: {value}") + if 'PATH' in key or 'HOME' in key: + cuda_paths.append((key, value)) + + # 7. 包版本兼容性检查 + print("\n📦 7. 包版本兼容性检查:") + try: + import pkg_resources + packages = ['torch', 'torchvision', 'torchaudio', 'onnxruntime', 'insightface', 'opencv-python', 'numpy'] + for pkg in packages: + try: + version = pkg_resources.get_distribution(pkg).version + print(f" {pkg}: {version}") + except: + print(f" {pkg}: 未安装") + except: + print(" 无法检查包版本") + + # 8. 实际性能测试 + print("\n🚀 8. 实际性能测试:") + try: + # 测试InsightFace实际使用 + app_cpu = insightface.app.FaceAnalysis(name='buffalo_l') + app_cpu.prepare(ctx_id=-1) # 强制CPU + + app_gpu = insightface.app.FaceAnalysis(name='buffalo_l') + app_gpu.prepare(ctx_id=0) # 强制GPU + + # 创建测试图像 + import numpy as np + import cv2 + test_img = np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8) + + # CPU测试 + import time + print(" CPU测试...") + start_time = time.time() + for _ in range(5): + faces_cpu = app_cpu.get(test_img) + cpu_time = (time.time() - start_time) * 1000 / 5 + + # GPU测试 + print(" GPU测试...") + start_time = time.time() + for _ in range(5): + faces_gpu = app_gpu.get(test_img) + gpu_time = (time.time() - start_time) * 1000 / 5 + + print(f" CPU平均时间: {cpu_time:.1f}ms") + print(f" GPU平均时间: {gpu_time:.1f}ms") + + if gpu_time < cpu_time * 0.8: # GPU应该比CPU快 + print(" ✅ GPU加速生效") + else: + print(" ⚠️ GPU加速未生效或效果不明显") + + except Exception as e: + print(f" ❌ 性能测试失败: {e}") + + +def check_package_installation(): + """检查包安装情况""" + print("\n" + "=" * 60) + print("包安装检查") + print("=" * 60) + + packages = { + 'torch': 'PyTorch (深度学习框架)', + 'torchvision': 'PyTorch视觉库', + 'torchaudio': 'PyTorch音频库', + 'onnxruntime': 'ONNX Runtime (推理引擎)', + 'insightface': '人脸识别库', + 'opencv-python': 'OpenCV (图像处理)', + 'numpy': '数值计算库' + } + + for pkg, desc in packages.items(): + try: + if pkg == 'torch': + import torch + version = torch.__version__ + cuda_status = "✅ CUDA可用" if torch.cuda.is_available() else "❌ CUDA不可用" + print(f"{pkg} ({desc}): {version} {cuda_status}") + elif pkg == 'onnxruntime': + import onnxruntime as ort + version = ort.__version__ + providers = ort.get_available_providers() + cuda_status = "✅ 有GPU支持" if 'CUDAExecutionProvider' in providers else "❌ 无GPU支持" + print(f"{pkg} ({desc}): {version} {cuda_status}") + else: + module = __import__(pkg) + version = getattr(module, '__version__', '未知版本') + print(f"{pkg} ({desc}): {version}") + except ImportError: + print(f"{pkg} ({desc}): ❌ 未安装") + + +if __name__ == "__main__": + detailed_diagnosis() + check_package_installation() + + # 提供解决方案 + print("\n" + "=" * 60) + print("解决方案建议") + print("=" * 60) + + # 基于诊断结果给出建议 + if not torch.cuda.is_available(): + print("\n❌ 主要问题: PyTorch没有CUDA支持") + print("💡 解决方案:") + print("1. 完全卸载当前PyTorch:") + print(" pip uninstall torch torchvision torchaudio") + print("2. 安装GPU版本的PyTorch:") + print(" pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118") + + if 'CUDAExecutionProvider' not in ort.get_available_providers(): + print("\n❌ 主要问题: ONNX Runtime没有GPU支持") + print("💡 解决方案:") + print("1. 卸载CPU版本:") + print(" pip uninstall onnxruntime") + print("2. 安装GPU版本:") + print(" pip install onnxruntime-gpu") + + # 通用建议 + print("\n🔄 通用建议:") + print("1. 创建全新的conda环境:") + print(" conda create -n face_gpu python=3.10") + print(" conda activate face_gpu") + print("2. 按顺序安装:") + print(" pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118") + print(" pip install onnxruntime-gpu") + print(" pip install insightface opencv-python") + print("3. 验证安装:") + print( + " python -c \"import torch; print(f'PyTorch CUDA: {torch.cuda.is_available()}'); import onnxruntime as ort; print(f'ONNX Providers: {ort.get_available_providers()}')\"") \ No newline at end of file diff --git a/src/cuda_t2.py b/src/cuda_t2.py new file mode 100644 index 0000000..93438f8 --- /dev/null +++ b/src/cuda_t2.py @@ -0,0 +1,200 @@ +# final_gpu_test.py +import torch +import onnxruntime as ort +import insightface +import cv2 +import numpy as np +import time +import os +from typing import List, Dict + + +def comprehensive_gpu_test(): + """全面的GPU测试""" + print("=" * 60) + print("最终GPU加速测试") + print("=" * 60) + + # 1. 环境验证 + print("1. 环境验证:") + print(f"✅ PyTorch: {torch.__version__}") + print(f"✅ PyTorch CUDA: {torch.cuda.is_available()}") + print(f"✅ GPU设备: {torch.cuda.get_device_name(0)}") + print(f"✅ ONNX Runtime GPU支持: {'CUDAExecutionProvider' in ort.get_available_providers()}") + print(f"✅ OpenCV: {cv2.__version__}") + print(f"✅ InsightFace: {insightface.__version__}") + + # 2. 创建测试数据 + print("\n2. 创建测试数据...") + os.makedirs("test_data", exist_ok=True) + + # 创建目标人脸 + target_face = np.random.randint(0, 255, (400, 400, 3), dtype=np.uint8) + cv2.rectangle(target_face, (150, 150), (250, 250), (255, 255, 255), -1) # 简单的人脸模拟 + cv2.imwrite("test_data/target_face.jpg", target_face) + + # 创建测试图像(多个人脸) + test_image = np.random.randint(0, 255, (800, 600, 3), dtype=np.uint8) + # 添加多个人脸区域 + cv2.rectangle(test_image, (100, 100), (200, 200), (255, 255, 255), -1) + cv2.rectangle(test_image, (300, 150), (400, 250), (255, 255, 255), -1) + cv2.rectangle(test_image, (500, 200), (600, 300), (255, 255, 255), -1) + cv2.imwrite("test_data/multi_face_test.jpg", test_image) + + print("✅ 测试数据创建完成") + + # 3. GPU加速的人脸识别系统 + print("\n3. 初始化GPU人脸识别系统...") + + class FastGPUFaceRecognition: + def __init__(self): + self.app = insightface.app.FaceAnalysis(name='buffalo_l') + self.app.prepare( + ctx_id=0, # GPU 0 + det_thresh=0.3, # 检测阈值 + det_size=(640, 640) + ) + self.target_embedding = None + print("✅ GPU人脸识别系统初始化完成") + + def set_target_face(self, image_path: str): + """设置目标人脸""" + img = cv2.imread(image_path) + faces = self.app.get(img) + if faces: + self.target_embedding = faces[0].embedding + print(f"✅ 目标人脸特征提取完成") + return True + return False + + def process_image(self, image_path: str, output_path: str): + """处理图像并返回结果""" + start_time = time.time() + + img = cv2.imread(image_path) + if img is None: + return None + + # GPU推理 + faces = self.app.get(img) + + results = [] + for i, face in enumerate(faces): + similarity = 0.0 + if self.target_embedding is not None: + # 计算相似度 + emb1 = face.embedding / np.linalg.norm(face.embedding) + emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) + similarity = float(np.dot(emb1, emb2)) + + results.append({ + 'face_index': i, + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'det_score': face.det_score + }) + + # 绘制结果 + bbox = face.bbox.astype(int) + color = (0, 255, 0) if similarity > 0.6 else (0, 0, 255) + cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) + cv2.putText(img, f"{similarity:.3f}", (bbox[0], bbox[1] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) + + # 保存结果 + cv2.imwrite(output_path, img) + + processing_time = (time.time() - start_time) * 1000 + return { + 'faces_detected': len(faces), + 'processing_time_ms': processing_time, + 'results': results + } + + # 初始化系统 + system = FastGPUFaceRecognition() + + # 4. 性能测试 + print("\n4. 性能测试:") + + # 设置目标人脸 + if system.set_target_face("test_data/target_face.jpg"): + # 处理测试图像 + result = system.process_image("test_data/multi_face_test.jpg", "test_data/gpu_result.jpg") + + if result: + print(f"✅ 处理完成:") + print(f" 检测到人脸数: {result['faces_detected']}") + print(f" 处理时间: {result['processing_time_ms']:.1f}ms") + + for face in result['results']: + status = "匹配" if face['similarity'] > 0.6 else "不匹配" + print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} ({status})") + + # 5. 批量性能测试 + print("\n5. 批量性能测试:") + + # 创建多个测试图像 + test_images = [] + for i in range(5): + test_img = np.random.randint(0, 255, (600, 800, 3), dtype=np.uint8) + cv2.rectangle(test_img, (100, 100), (200, 200), (255, 255, 255), -1) + img_path = f"test_data/batch_test_{i}.jpg" + cv2.imwrite(img_path, test_img) + test_images.append(img_path) + + total_time = 0 + total_faces = 0 + + for i, img_path in enumerate(test_images): + result = system.process_image(img_path, f"test_data/batch_result_{i}.jpg") + if result: + total_time += result['processing_time_ms'] + total_faces += result['faces_detected'] + print(f" 图像 {i + 1}: {result['processing_time_ms']:.1f}ms, {result['faces_detected']}张人脸") + + if len(test_images) > 0: + avg_time = total_time / len(test_images) + print(f" 平均处理时间: {avg_time:.1f}ms/张") + print(f" 总检测人脸: {total_faces}张") + + # 6. 与CPU性能对比(可选) + print("\n6. GPU vs CPU 性能对比:") + try: + # GPU测试 + gpu_times = [] + test_img = cv2.imread("test_data/multi_face_test.jpg") + for _ in range(10): + start = time.time() + faces = system.app.get(test_img) + gpu_times.append((time.time() - start) * 1000) + + # CPU测试(创建新的CPU实例) + cpu_app = insightface.app.FaceAnalysis(name='buffalo_l') + cpu_app.prepare(ctx_id=-1) # CPU + + cpu_times = [] + for _ in range(10): + start = time.time() + faces = cpu_app.get(test_img) + cpu_times.append((time.time() - start) * 1000) + + avg_gpu = np.mean(gpu_times) + avg_cpu = np.mean(cpu_times) + + print(f" GPU平均推理时间: {avg_gpu:.1f}ms") + print(f" CPU平均推理时间: {avg_cpu:.1f}ms") + print(f" GPU加速比: {avg_cpu / avg_gpu:.1f}x") + + except Exception as e: + print(f" 性能对比测试跳过: {e}") + + print("\n" + "=" * 60) + print("🎉 GPU加速测试完成!") + print("✅ 现在您的人脸识别系统正在使用GPU加速") + print("📁 结果图像保存在 test_data/ 目录中") + print("=" * 60) + + +if __name__ == "__main__": + comprehensive_gpu_test() \ No newline at end of file diff --git a/src/cuda_t3.py b/src/cuda_t3.py new file mode 100644 index 0000000..93438f8 --- /dev/null +++ b/src/cuda_t3.py @@ -0,0 +1,200 @@ +# final_gpu_test.py +import torch +import onnxruntime as ort +import insightface +import cv2 +import numpy as np +import time +import os +from typing import List, Dict + + +def comprehensive_gpu_test(): + """全面的GPU测试""" + print("=" * 60) + print("最终GPU加速测试") + print("=" * 60) + + # 1. 环境验证 + print("1. 环境验证:") + print(f"✅ PyTorch: {torch.__version__}") + print(f"✅ PyTorch CUDA: {torch.cuda.is_available()}") + print(f"✅ GPU设备: {torch.cuda.get_device_name(0)}") + print(f"✅ ONNX Runtime GPU支持: {'CUDAExecutionProvider' in ort.get_available_providers()}") + print(f"✅ OpenCV: {cv2.__version__}") + print(f"✅ InsightFace: {insightface.__version__}") + + # 2. 创建测试数据 + print("\n2. 创建测试数据...") + os.makedirs("test_data", exist_ok=True) + + # 创建目标人脸 + target_face = np.random.randint(0, 255, (400, 400, 3), dtype=np.uint8) + cv2.rectangle(target_face, (150, 150), (250, 250), (255, 255, 255), -1) # 简单的人脸模拟 + cv2.imwrite("test_data/target_face.jpg", target_face) + + # 创建测试图像(多个人脸) + test_image = np.random.randint(0, 255, (800, 600, 3), dtype=np.uint8) + # 添加多个人脸区域 + cv2.rectangle(test_image, (100, 100), (200, 200), (255, 255, 255), -1) + cv2.rectangle(test_image, (300, 150), (400, 250), (255, 255, 255), -1) + cv2.rectangle(test_image, (500, 200), (600, 300), (255, 255, 255), -1) + cv2.imwrite("test_data/multi_face_test.jpg", test_image) + + print("✅ 测试数据创建完成") + + # 3. GPU加速的人脸识别系统 + print("\n3. 初始化GPU人脸识别系统...") + + class FastGPUFaceRecognition: + def __init__(self): + self.app = insightface.app.FaceAnalysis(name='buffalo_l') + self.app.prepare( + ctx_id=0, # GPU 0 + det_thresh=0.3, # 检测阈值 + det_size=(640, 640) + ) + self.target_embedding = None + print("✅ GPU人脸识别系统初始化完成") + + def set_target_face(self, image_path: str): + """设置目标人脸""" + img = cv2.imread(image_path) + faces = self.app.get(img) + if faces: + self.target_embedding = faces[0].embedding + print(f"✅ 目标人脸特征提取完成") + return True + return False + + def process_image(self, image_path: str, output_path: str): + """处理图像并返回结果""" + start_time = time.time() + + img = cv2.imread(image_path) + if img is None: + return None + + # GPU推理 + faces = self.app.get(img) + + results = [] + for i, face in enumerate(faces): + similarity = 0.0 + if self.target_embedding is not None: + # 计算相似度 + emb1 = face.embedding / np.linalg.norm(face.embedding) + emb2 = self.target_embedding / np.linalg.norm(self.target_embedding) + similarity = float(np.dot(emb1, emb2)) + + results.append({ + 'face_index': i, + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'det_score': face.det_score + }) + + # 绘制结果 + bbox = face.bbox.astype(int) + color = (0, 255, 0) if similarity > 0.6 else (0, 0, 255) + cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) + cv2.putText(img, f"{similarity:.3f}", (bbox[0], bbox[1] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) + + # 保存结果 + cv2.imwrite(output_path, img) + + processing_time = (time.time() - start_time) * 1000 + return { + 'faces_detected': len(faces), + 'processing_time_ms': processing_time, + 'results': results + } + + # 初始化系统 + system = FastGPUFaceRecognition() + + # 4. 性能测试 + print("\n4. 性能测试:") + + # 设置目标人脸 + if system.set_target_face("test_data/target_face.jpg"): + # 处理测试图像 + result = system.process_image("test_data/multi_face_test.jpg", "test_data/gpu_result.jpg") + + if result: + print(f"✅ 处理完成:") + print(f" 检测到人脸数: {result['faces_detected']}") + print(f" 处理时间: {result['processing_time_ms']:.1f}ms") + + for face in result['results']: + status = "匹配" if face['similarity'] > 0.6 else "不匹配" + print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} ({status})") + + # 5. 批量性能测试 + print("\n5. 批量性能测试:") + + # 创建多个测试图像 + test_images = [] + for i in range(5): + test_img = np.random.randint(0, 255, (600, 800, 3), dtype=np.uint8) + cv2.rectangle(test_img, (100, 100), (200, 200), (255, 255, 255), -1) + img_path = f"test_data/batch_test_{i}.jpg" + cv2.imwrite(img_path, test_img) + test_images.append(img_path) + + total_time = 0 + total_faces = 0 + + for i, img_path in enumerate(test_images): + result = system.process_image(img_path, f"test_data/batch_result_{i}.jpg") + if result: + total_time += result['processing_time_ms'] + total_faces += result['faces_detected'] + print(f" 图像 {i + 1}: {result['processing_time_ms']:.1f}ms, {result['faces_detected']}张人脸") + + if len(test_images) > 0: + avg_time = total_time / len(test_images) + print(f" 平均处理时间: {avg_time:.1f}ms/张") + print(f" 总检测人脸: {total_faces}张") + + # 6. 与CPU性能对比(可选) + print("\n6. GPU vs CPU 性能对比:") + try: + # GPU测试 + gpu_times = [] + test_img = cv2.imread("test_data/multi_face_test.jpg") + for _ in range(10): + start = time.time() + faces = system.app.get(test_img) + gpu_times.append((time.time() - start) * 1000) + + # CPU测试(创建新的CPU实例) + cpu_app = insightface.app.FaceAnalysis(name='buffalo_l') + cpu_app.prepare(ctx_id=-1) # CPU + + cpu_times = [] + for _ in range(10): + start = time.time() + faces = cpu_app.get(test_img) + cpu_times.append((time.time() - start) * 1000) + + avg_gpu = np.mean(gpu_times) + avg_cpu = np.mean(cpu_times) + + print(f" GPU平均推理时间: {avg_gpu:.1f}ms") + print(f" CPU平均推理时间: {avg_cpu:.1f}ms") + print(f" GPU加速比: {avg_cpu / avg_gpu:.1f}x") + + except Exception as e: + print(f" 性能对比测试跳过: {e}") + + print("\n" + "=" * 60) + print("🎉 GPU加速测试完成!") + print("✅ 现在您的人脸识别系统正在使用GPU加速") + print("📁 结果图像保存在 test_data/ 目录中") + print("=" * 60) + + +if __name__ == "__main__": + comprehensive_gpu_test() \ No newline at end of file diff --git a/src/face_recognition_system.py b/src/face_recognition_system.py new file mode 100644 index 0000000..53eb211 --- /dev/null +++ b/src/face_recognition_system.py @@ -0,0 +1,249 @@ +import os +import cv2 +import numpy as np +from insightface.app import FaceAnalysis +from insightface.data import get_image as ins_get_image +import pickle +from typing import List, Tuple, Dict +import json + + +class FaceRecognitionSystem: + def __init__(self, model_name: str = 'buffalo_l'): + """ + 初始化人脸识别系统 + Args: + model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等 + """ + self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) + self.app.prepare(ctx_id=0, det_size=(640, 640)) + self.face_database = {} # 存储人脸特征向量 + self.database_file = "face_database.pkl" + + # 加载已有数据库 + self.load_database() + + def extract_face_features(self, image_path: str) -> List[Dict]: + """ + 从图像中提取人脸特征 + Args: + image_path: 图像路径 + Returns: + List[Dict]: 包含人脸信息的列表 + """ + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"无法读取图像: {image_path}") + + faces = self.app.get(img) + results = [] + + for i, face in enumerate(faces): + face_info = { + 'bbox': face.bbox.astype(int).tolist(), # 人脸框 + 'kps': face.kps.astype(int).tolist(), # 关键点 + 'embedding': face.embedding.tolist(), # 特征向量 + 'gender': face.gender, # 性别 + 'age': face.age # 年龄 + } + results.append(face_info) + + return results + + def register_face(self, image_path: str, person_id: str): + """ + 注册人脸到数据库 + Args: + image_path: 图像路径 + person_id: 人员ID + """ + faces = self.extract_face_features(image_path) + + if not faces: + print(f"在图像 {image_path} 中未检测到人脸") + return False + + if len(faces) > 1: + print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸") + + # 存储第一个人脸的特征 + self.face_database[person_id] = { + 'embedding': np.array(faces[0]['embedding']), + 'image_path': image_path + } + + self.save_database() + print(f"成功注册人脸: {person_id}") + return True + + def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: + """ + 计算两个人脸特征的相似度 + Args: + embedding1: 特征向量1 + embedding2: 特征向量2 + Returns: + float: 相似度得分 (0-1之间,越大越相似) + """ + # 计算余弦相似度 + similarity = np.dot(embedding1, embedding2) / ( + np.linalg.norm(embedding1) * np.linalg.norm(embedding2) + ) + return float(similarity) + + def one_vs_one(self, image_path1: str, image_path2: str) -> Tuple[float, bool]: + """ + 1v1人脸比对 + Args: + image_path1: 图像1路径 + image_path2: 图像2路径 + Returns: + Tuple[float, bool]: (相似度得分, 是否同一人) + """ + faces1 = self.extract_face_features(image_path1) + faces2 = self.extract_face_features(image_path2) + + if not faces1 or not faces2: + return 0.0, False + + embedding1 = np.array(faces1[0]['embedding']) + embedding2 = np.array(faces2[0]['embedding']) + + similarity = self.compare_faces(embedding1, embedding2) + is_same = similarity > 0.6 # 阈值可根据实际情况调整 + + return similarity, is_same + + def one_vs_many(self, image_path: str, threshold: float = 0.6) -> List[Tuple[str, float]]: + """ + 1vn人脸检索 + Args: + image_path: 查询图像路径 + threshold: 相似度阈值 + Returns: + List[Tuple[str, float]]: 匹配结果 (人员ID, 相似度) + """ + faces = self.extract_face_features(image_path) + if not faces: + return [] + + query_embedding = np.array(faces[0]['embedding']) + results = [] + + for person_id, data in self.face_database.items(): + similarity = self.compare_faces(query_embedding, data['embedding']) + if similarity > threshold: + results.append((person_id, similarity)) + + # 按相似度降序排序 + results.sort(key=lambda x: x[1], reverse=True) + return results + + def save_database(self): + """保存人脸数据库到文件""" + # 将numpy数组转换为列表以便序列化 + save_data = {} + for person_id, data in self.face_database.items(): + save_data[person_id] = { + 'embedding': data['embedding'].tolist(), + 'image_path': data['image_path'] + } + + with open(self.database_file, 'wb') as f: + pickle.dump(save_data, f) + + def load_database(self): + """从文件加载人脸数据库""" + if os.path.exists(self.database_file): + with open(self.database_file, 'rb') as f: + save_data = pickle.load(f) + + # 将列表转换回numpy数组 + for person_id, data in save_data.items(): + self.face_database[person_id] = { + 'embedding': np.array(data['embedding']), + 'image_path': data['image_path'] + } + print(f"已加载数据库,包含 {len(self.face_database)} 个人脸") + + def visualize_detection(self, image_path: str, save_path: str = None): + """ + 可视化人脸检测结果 + Args: + image_path: 图像路径 + save_path: 保存路径 + """ + img = cv2.imread(image_path) + faces = self.app.get(img) + + for face in faces: + # 绘制人脸框 + bbox = face.bbox.astype(int) + cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2) + + # 绘制关键点 + for kp in face.kps.astype(int): + cv2.circle(img, (kp[0], kp[1]), 2, (0, 0, 255), -1) + + # 显示性别和年龄 + info = f"{'M' if face.gender == 1 else 'F'}/{face.age}" + cv2.putText(img, info, (bbox[0], bbox[1] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1) + + if save_path: + cv2.imwrite(save_path, img) + + return img + + +# 使用示例 +def main(): + # 初始化系统 + face_system = FaceRecognitionSystem() + + # # 创建测试图像目录 + # os.makedirs("test_images", exist_ok=True) + # + # # 示例1: 注册人脸 + # print("=== 注册人脸 ===") + # # 假设你有一些人脸图像放在 test_images 目录下 + # test_images = { + # "person_001": "test_images/person1.png", + # "person_002": "test_images/person2.jpg", + # # "person_003": "test_images/person3.jpg" + # } + # + # for person_id, img_path in test_images.items(): + # if os.path.exists(img_path): + # face_system.register_face(img_path, person_id) + + # 示例2: 1v1比对 + print("\n=== 1v1人脸比对 ===") + img1 = "test_data/register/person1.png" + # img1 = "test_data/query/file___media_Photo_103_IMG_1737872809_085_IMG_20250126_142509.jpg" + img2 = "test_data/query/file___media_Photo_152_IMG_1737876072_134_IMG_20250126_151932.jpg" + + + if os.path.exists(img1) and os.path.exists(img2): + similarity, is_same = face_system.one_vs_one(img1, img2) + print(f"相似度: {similarity:.4f}, 是否同一人: {is_same}") + + # # 示例3: 1vn检索 + # print("\n=== 1vn人脸检索 ===") + # query_img = "test_images/query.jpg" + # if os.path.exists(query_img): + # results = face_system.one_vs_many(query_img) + # print("检索结果:") + # for person_id, score in results: + # print(f" {person_id}: {score:.4f}") + # + # # 示例4: 可视化检测结果 + # print("\n=== 人脸检测可视化 ===") + # test_img = "test_images/test.jpg" + # if os.path.exists(test_img): + # output_img = face_system.visualize_detection(test_img, "output/detection_result.jpg") + # print("检测结果已保存到 output/detection_result.jpg") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/face_recognition_system_2.py b/src/face_recognition_system_2.py new file mode 100644 index 0000000..6306422 --- /dev/null +++ b/src/face_recognition_system_2.py @@ -0,0 +1,496 @@ +import os +import cv2 +import numpy as np +from insightface.app import FaceAnalysis +import pickle +from typing import List, Tuple, Dict, Optional +from datetime import datetime +import json + + +class MultiFaceRecognitionSystem: + """ + 多人脸识别系统 + 支持多个人脸检测、识别和标注 + """ + + def __init__(self, model_name: str = 'buffalo_l'): + """ + 初始化人脸识别系统 + + Args: + model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等 + """ + # 初始化InsightFace + self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) + self.app.prepare(ctx_id=0, det_size=(640, 640)) + + # 人脸数据库 + self.face_database = {} + self.database_file = "face_database.pkl" + + # 加载已有数据库 + self.load_database() + + # 相似度阈值 + self.similarity_threshold = 0.6 + + # 颜色配置 + self.colors = { + 'known': (0, 255, 0), # 绿色 - 已知人脸 + 'unknown': (0, 0, 255), # 红色 - 未知人脸 + 'text': (255, 255, 255), # 白色 - 文字 + 'landmark': (255, 0, 0) # 蓝色 - 关键点 + } + + def register_face(self, image_path: str, person_id: str) -> bool: + """ + 注册单个人脸到数据库 + + Args: + image_path: 图像路径 + person_id: 人员ID + + Returns: + bool: 注册是否成功 + """ + if not os.path.exists(image_path): + print(f"图像文件不存在: {image_path}") + return False + + # 提取人脸特征 + faces = self.extract_face_features(image_path) + + if not faces: + print(f"在图像 {image_path} 中未检测到人脸") + return False + + if len(faces) > 1: + print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸") + + # 存储到数据库 + self.face_database[person_id] = { + 'embedding': np.array(faces[0]['embedding']), + 'image_path': image_path, + 'registration_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + # 保存数据库 + self.save_database() + print(f"成功注册人脸: {person_id}") + return True + + def batch_register_faces(self, image_dir: str, id_prefix: str = "person") -> int: + """ + 批量注册人脸 + + Args: + image_dir: 图像目录 + id_prefix: ID前缀 + + Returns: + int: 成功注册的数量 + """ + if not os.path.exists(image_dir): + print(f"目录不存在: {image_dir}") + return 0 + + registered_count = 0 + image_files = [f for f in os.listdir(image_dir) + if f.lower().endswith(('.jpg', '.jpeg', '.png'))] + + for i, filename in enumerate(image_files): + image_path = os.path.join(image_dir, filename) + person_id = f"{id_prefix}_{i + 1:03d}" + + if self.register_face(image_path, person_id): + registered_count += 1 + + print(f"批量注册完成: {registered_count}/{len(image_files)} 个人脸") + return registered_count + + def extract_face_features(self, image_path: str) -> List[Dict]: + """ + 从图像中提取所有人脸特征 + + Args: + image_path: 图像路径 + + Returns: + List[Dict]: 包含多个人脸信息的列表 + """ + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"无法读取图像: {image_path}") + + faces = self.app.get(img) + results = [] + + for i, face in enumerate(faces): + face_info = { + 'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2] + 'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...] + 'embedding': face.embedding.tolist(), # 特征向量 + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score) # 检测置信度 + } + results.append(face_info) + + return results + + def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: + """ + 计算两个人脸特征的相似度 + + Args: + embedding1: 特征向量1 + embedding2: 特征向量2 + + Returns: + float: 余弦相似度 (0-1) + """ + # 归一化向量 + embedding1 = embedding1 / np.linalg.norm(embedding1) + embedding2 = embedding2 / np.linalg.norm(embedding2) + + # 计算余弦相似度 + similarity = np.dot(embedding1, embedding2) + return float(similarity) + + def search_face(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]: + """ + 在数据库中搜索最相似的人脸 + + Args: + query_embedding: 查询特征向量 + top_k: 返回最相似的k个结果 + + Returns: + List[Tuple[str, float]]: (人员ID, 相似度) + """ + if not self.face_database: + return [] + + results = [] + query_embedding = query_embedding / np.linalg.norm(query_embedding) + + for person_id, data in self.face_database.items(): + db_embedding = data['embedding'] / np.linalg.norm(data['embedding']) + similarity = np.dot(query_embedding, db_embedding) + + if similarity > self.similarity_threshold: + results.append((person_id, float(similarity))) + + # 按相似度降序排序 + results.sort(key=lambda x: x[1], reverse=True) + + # 返回top_k个结果 + return results[:top_k] + + def recognize_faces_in_image(self, image_path: str, output_path: str = None, + draw_landmarks: bool = True, draw_info: bool = True) -> Dict: + """ + 识别图像中的所有脸并进行标注 + + Args: + image_path: 输入图像路径 + output_path: 输出图像路径 + draw_landmarks: 是否绘制关键点 + draw_info: 是否绘制详细信息 + + Returns: + Dict: 识别结果 + """ + # 读取图像 + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"无法读取图像: {image_path}") + + # 提取人脸特征 + faces = self.extract_face_features(image_path) + recognition_results = { + 'image_path': image_path, + 'faces_detected': len(faces), + 'recognitions': [] + } + + # 对每个检测到的人脸进行识别 + for i, face in enumerate(faces): + # 在数据库中搜索 + query_embedding = np.array(face['embedding']) + search_results = self.search_face(query_embedding, top_k=1) + + # 识别结果 + person_id = "Unknown" + similarity = 0.0 + + if search_results: + person_id, similarity = search_results[0] + + # 存储识别结果 + face_recognition = { + 'face_index': i, + 'bbox': face['bbox'], + 'person_id': person_id, + 'similarity': similarity, + 'gender': face['gender'], + 'age': face['age'], + 'det_score': face['det_score'] + } + recognition_results['recognitions'].append(face_recognition) + + # 在图像上绘制标注 + self._draw_face_annotation(img, face_recognition, draw_landmarks, draw_info) + + # 保存输出图像 + if output_path: + os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) + cv2.imwrite(output_path, img) + print(f"标注图像已保存: {output_path}") + + return recognition_results + + def _draw_face_annotation(self, img: np.ndarray, face_info: Dict, + draw_landmarks: bool, draw_info: bool): + """ + 在图像上绘制人脸标注 + + Args: + img: 图像数组 + face_info: 人脸信息 + draw_landmarks: 是否绘制关键点 + draw_info: 是否绘制详细信息 + """ + bbox = face_info['bbox'] # [x1, y1, x2, y2] + person_id = face_info['person_id'] + similarity = face_info['similarity'] + + # 确定颜色:已知人脸用绿色,未知用红色 + if person_id != "Unknown": + color = self.colors['known'] + label = f"{person_id} ({similarity:.3f})" + else: + color = self.colors['unknown'] + label = "Unknown" + + # 绘制人脸框 + cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) + + # 绘制关键点 + if draw_landmarks and 'kps' in face_info: + for kp in face_info['kps']: + cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1) + + # 绘制信息文本 + if draw_info: + # 基础信息 + text_y = bbox[1] - 10 + if text_y < 20: + text_y = bbox[3] + 20 + + cv2.putText(img, label, (bbox[0], text_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2) + + # 详细信息 + detail_text = f"{face_info['gender']}/{face_info['age']}" + cv2.putText(img, detail_text, (bbox[0], text_y + 25), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1) + + def batch_process_images(self, input_dir: str, output_dir: str, + image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict: + """ + 批量处理目录中的所有图像 + + Args: + input_dir: 输入图像目录 + output_dir: 输出图像目录 + image_extensions: 图像文件扩展名 + + Returns: + Dict: 批量处理结果统计 + """ + if not os.path.exists(input_dir): + raise ValueError(f"输入目录不存在: {input_dir}") + + os.makedirs(output_dir, exist_ok=True) + + # 获取所有图像文件 + image_files = [] + for ext in image_extensions: + image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)]) + + print(f"找到 {len(image_files)} 个图像文件") + + # 处理统计 + stats = { + 'total_images': len(image_files), + 'processed_images': 0, + 'total_faces': 0, + 'recognized_faces': 0, + 'results': [] + } + + # 批量处理 + for image_file in image_files: + input_path = os.path.join(input_dir, image_file) + output_path = os.path.join(output_dir, f"annotated_{image_file}") + + try: + # 处理单张图像 + result = self.recognize_faces_in_image(input_path, output_path) + + # 更新统计 + stats['processed_images'] += 1 + stats['total_faces'] += result['faces_detected'] + + recognized = sum(1 for face in result['recognitions'] if face['person_id'] != "Unknown") + stats['recognized_faces'] += recognized + + stats['results'].append({ + 'image_file': image_file, + 'faces_detected': result['faces_detected'], + 'recognized_faces': recognized + }) + + print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, 识别出 {recognized} 张") + + except Exception as e: + print(f"处理图像 {image_file} 时出错: {e}") + + # 保存处理统计 + stats_path = os.path.join(output_dir, "processing_stats.json") + with open(stats_path, 'w', encoding='utf-8') as f: + json.dump(stats, f, indent=2, ensure_ascii=False) + + print(f"批量处理完成! 统计信息已保存: {stats_path}") + return stats + + def save_database(self): + """保存人脸数据库到文件""" + # 转换为可序列化的格式 + save_data = {} + for person_id, data in self.face_database.items(): + save_data[person_id] = { + 'embedding': data['embedding'].tolist(), + 'image_path': data['image_path'], + 'registration_time': data.get('registration_time', 'Unknown') + } + + with open(self.database_file, 'wb') as f: + pickle.dump(save_data, f) + + print(f"人脸数据库已保存: {self.database_file} (共 {len(self.face_database)} 人)") + + def load_database(self): + """从文件加载人脸数据库""" + if os.path.exists(self.database_file): + try: + with open(self.database_file, 'rb') as f: + save_data = pickle.load(f) + + # 转换回numpy数组 + for person_id, data in save_data.items(): + self.face_database[person_id] = { + 'embedding': np.array(data['embedding']), + 'image_path': data['image_path'], + 'registration_time': data.get('registration_time', 'Unknown') + } + + print(f"人脸数据库已加载: {len(self.face_database)} 人") + except Exception as e: + print(f"加载数据库失败: {e}") + self.face_database = {} + else: + print("数据库文件不存在,将创建新数据库") + self.face_database = {} + + def get_database_info(self) -> Dict: + """获取数据库信息""" + return { + 'total_persons': len(self.face_database), + 'person_ids': list(self.face_database.keys()), + 'database_file': self.database_file + } + + def set_similarity_threshold(self, threshold: float): + """设置相似度阈值""" + if 0 <= threshold <= 1: + self.similarity_threshold = threshold + print(f"相似度阈值已设置为: {threshold}") + else: + print("阈值必须在 0 到 1 之间") + + +# 使用示例和演示 +def demo_usage(): + """演示使用方法""" + + # 创建人脸识别系统 + face_system = MultiFaceRecognitionSystem() + + # 1. 注册人脸到数据库 + print("=== 注册人脸 ===") + + # 创建测试数据目录 + os.makedirs("test_data/register", exist_ok=True) + os.makedirs("test_data/query", exist_ok=True) + os.makedirs("test_data/output", exist_ok=True) + + # 假设你有一些人脸图像用于注册 + # face_system.register_face("test_data/register/person1.jpg", "alice") + # face_system.register_face("test_data/register/person2.jpg", "bob") + # face_system.register_face("test_data/register/person3.jpg", "charlie") + + # # 或者批量注册 + # face_system.batch_register_faces("test_data/register", "person") + + # 2. 查看数据库信息 + print("\n=== 数据库信息 ===") + db_info = face_system.get_database_info() + print(f"数据库人数: {db_info['total_persons']}") + print(f"人员ID: {db_info['person_ids']}") + + # 3. 单张图像识别示例 + print("\n=== 单张图像识别 ===") + query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像 + + if os.path.exists(query_image): + result = face_system.recognize_faces_in_image( + image_path=query_image, + output_path="test_data/output/annotated_multi_face.jpg", + draw_landmarks=True, + draw_info=True + ) + + print(f"检测到 {result['faces_detected']} 张人脸:") + for face in result['recognitions']: + status = "已知" if face['person_id'] != "Unknown" else "未知" + print(f" 人脸 {face['face_index'] + 1}: {face['person_id']} " + f"(相似度: {face['similarity']:.3f}, {face['gender']}/{face['age']}岁) - {status}") + else: + print(f"查询图像不存在: {query_image}") + print("请将包含多人脸的图像放在 test_data/query/ 目录下") + + # 4. 批量处理示例 + print("\n=== 批量处理图像 ===") + if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0: + stats = face_system.batch_process_images( + input_dir="test_data/query", + output_dir="test_data/output/batch_results" + ) + + print(f"批量处理统计:") + print(f" 总图像数: {stats['total_images']}") + print(f" 成功处理: {stats['processed_images']}") + print(f" 总人脸数: {stats['total_faces']}") + print(f" 识别出的人脸: {stats['recognized_faces']}") + + # 5. 调整相似度阈值 + print("\n=== 调整阈值 ===") + face_system.set_similarity_threshold(0.1) # 降低阈值,更宽松 + # face_system.set_similarity_threshold(0.7) # 提高阈值,更严格 + + +if __name__ == "__main__": + demo_usage() \ No newline at end of file diff --git a/src/face_recognition_system_3.py b/src/face_recognition_system_3.py new file mode 100644 index 0000000..a9a7144 --- /dev/null +++ b/src/face_recognition_system_3.py @@ -0,0 +1,481 @@ +import os +import cv2 +import numpy as np +from insightface.app import FaceAnalysis +import pickle +from typing import List, Tuple, Dict, Optional +from datetime import datetime +import json + + +class SingleFaceComparisonSystem: + """ + 单人脸比对系统 + 在图像中检测多个人脸,并与单个目标人脸进行比对 + """ + + def __init__(self, model_name: str = 'buffalo_l'): + """ + 初始化人脸比对系统 + + Args: + model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等 + """ + # 初始化InsightFace + self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) + self.app.prepare(ctx_id=0, + det_thresh=0.35, # 降低阈值,检测更多模糊/小脸 + det_size=(640, 640)) + + # 目标人脸特征 + self.target_embedding = None + self.target_person_id = None + self.target_image_path = None + + # 相似度阈值 + self.similarity_threshold = 0.25 + + # 颜色配置 + self.colors = { + 'match_high': (0, 255, 0), # 绿色 - 高相似度 + 'match_medium': (0, 255, 255), # 黄色 - 中等相似度 + 'match_low': (0, 165, 255), # 橙色 - 低相似度 + 'no_match': (0, 0, 255), # 红色 - 不匹配 + 'text': (255, 255, 255), # 白色 - 文字 + 'landmark': (255, 0, 0) # 蓝色 - 关键点 + } + + def set_target_face(self, image_path: str, person_id: str = "target_person") -> bool: + """ + 设置目标人脸 + + Args: + image_path: 目标人脸图像路径 + person_id: 目标人员ID + + Returns: + bool: 设置是否成功 + """ + if not os.path.exists(image_path): + print(f"目标图像文件不存在: {image_path}") + return False + + # 提取人脸特征 + faces = self.extract_face_features(image_path) + + if not faces: + print(f"在目标图像 {image_path} 中未检测到人脸") + return False + + if len(faces) > 1: + print(f"在目标图像 {image_path} 中检测到多个人脸,将使用第一个人脸") + + # 存储目标人脸特征 + self.target_embedding = np.array(faces[0]['embedding']) + self.target_person_id = person_id + self.target_image_path = image_path + + print(f"成功设置目标人脸: {person_id}") + return True + + def extract_face_features(self, image_path: str) -> List[Dict]: + """ + 从图像中提取所有人脸特征 + + Args: + image_path: 图像路径 + + Returns: + List[Dict]: 包含多个人脸信息的列表 + """ + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"无法读取图像: {image_path}") + + faces = self.app.get(img) + results = [] + + for i, face in enumerate(faces): + face_info = { + 'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2] + 'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...] + 'embedding': face.embedding.tolist(), # 特征向量 + 'gender': 'Male' if face.gender == 1 else 'Female', + 'age': int(face.age), + 'det_score': float(face.det_score) # 检测置信度 + } + results.append(face_info) + + return results + + def compare_with_target(self, query_embedding: np.ndarray) -> float: + """ + 与目标人脸进行比对 + + Args: + query_embedding: 查询特征向量 + + Returns: + float: 相似度 (0-1) + """ + if self.target_embedding is None: + raise ValueError("未设置目标人脸") + + # 归一化向量 + query_embedding = query_embedding / np.linalg.norm(query_embedding) + target_embedding = self.target_embedding / np.linalg.norm(self.target_embedding) + + # 计算余弦相似度 + similarity = np.dot(query_embedding, target_embedding) + return float(similarity) + + def get_similarity_color(self, similarity: float) -> tuple: + """ + 根据相似度获取对应的颜色 + + Args: + similarity: 相似度 (0-1) + + Returns: + tuple: BGR颜色值 + """ + if similarity >= self.similarity_threshold: + return self.colors['match_high'] # 高相似度 - 绿色 + elif similarity >= self.similarity_threshold/2: + return self.colors['match_low'] # 低相似度 - 橙色 + else: + return self.colors['no_match'] # 不匹配 - 红色 + + def process_image_with_target_comparison(self, image_path: str, output_path: str = None, + draw_landmarks: bool = True, draw_info: bool = True) -> Dict: + """ + 处理图像并与目标人脸进行比对 + + Args: + image_path: 输入图像路径 + output_path: 输出图像路径 + draw_landmarks: 是否绘制关键点 + draw_info: 是否绘制详细信息 + + Returns: + Dict: 处理结果 + """ + if self.target_embedding is None: + raise ValueError("请先设置目标人脸") + + # 读取图像 + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"无法读取图像: {image_path}") + + # 提取人脸特征 + faces = self.extract_face_features(image_path) + comparison_results = { + 'image_path': image_path, + 'target_person_id': self.target_person_id, + 'faces_detected': len(faces), + 'comparisons': [] + } + + # 对每个检测到的人脸进行比对 + for i, face in enumerate(faces): + # 与目标人脸比对 + query_embedding = np.array(face['embedding']) + similarity = self.compare_with_target(query_embedding) + + # 存储比对结果 + face_comparison = { + 'face_index': i, + 'bbox': face['bbox'], + 'similarity': similarity, + 'is_match': similarity >= self.similarity_threshold, + 'gender': face['gender'], + 'age': face['age'], + 'det_score': face['det_score'] + } + comparison_results['comparisons'].append(face_comparison) + + # 在图像上绘制标注 + self._draw_face_comparison_annotation(img, face_comparison, draw_landmarks, draw_info) + + # 保存输出图像 + if output_path: + os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) + cv2.imwrite(output_path, img) + print(f"比对结果图像已保存: {output_path}") + + return comparison_results + + def _draw_face_comparison_annotation(self, img: np.ndarray, face_info: Dict, + draw_landmarks: bool, draw_info: bool): + """ + 在图像上绘制人脸比对标注 + + Args: + img: 图像数组 + face_info: 人脸比对信息 + draw_landmarks: 是否绘制关键点 + draw_info: 是否绘制详细信息 + """ + bbox = face_info['bbox'] # [x1, y1, x2, y2] + similarity = face_info['similarity'] + is_match = face_info['is_match'] + + # 根据相似度确定颜色 + color = self.get_similarity_color(similarity) + + # 绘制人脸框 + thickness = 3 if is_match else 2 + cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, thickness) + + # 绘制关键点 + if draw_landmarks and 'kps' in face_info: + for kp in face_info['kps']: + cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1) + + # 绘制信息文本 + if draw_info: + # 相似度信息 + text_y = bbox[1] - 10 + if text_y < 20: + text_y = bbox[3] + 20 + + similarity_text = f"Similarity: {similarity:.3f}" + match_status = "MATCH" if is_match else "NO MATCH" + + cv2.putText(img, similarity_text, (bbox[0], text_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2) + + # 匹配状态 + cv2.putText(img, match_status, (bbox[0], text_y + 25), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + + # 人脸属性信息 + detail_text = f"{face_info['gender']}/{face_info['age']}" + cv2.putText(img, detail_text, (bbox[0], text_y + 50), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1) + + def batch_process_with_target(self, input_dir: str, output_dir: str, + image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict: + """ + 批量处理目录中的所有图像,与目标人脸进行比对 + + Args: + input_dir: 输入图像目录 + output_dir: 输出图像目录 + image_extensions: 图像文件扩展名 + + Returns: + Dict: 批量处理结果统计 + """ + if self.target_embedding is None: + raise ValueError("请先设置目标人脸") + + if not os.path.exists(input_dir): + raise ValueError(f"输入目录不存在: {input_dir}") + + os.makedirs(output_dir, exist_ok=True) + + # 获取所有图像文件 + image_files = [] + for ext in image_extensions: + image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)]) + + print(f"找到 {len(image_files)} 个图像文件") + + # 处理统计 + stats = { + 'target_person_id': self.target_person_id, + 'total_images': len(image_files), + 'processed_images': 0, + 'total_faces': 0, + 'matched_faces': 0, + 'max_similarity': 0.0, + 'min_similarity': 1.0, + 'avg_similarity': 0.0, + 'results': [] + } + + all_similarities = [] + + # 批量处理 + for image_file in image_files: + input_path = os.path.join(input_dir, image_file) + output_path = os.path.join(output_dir, f"compared_{image_file}") + + try: + # 处理单张图像 + result = self.process_image_with_target_comparison(input_path, output_path) + + # 更新统计 + stats['processed_images'] += 1 + stats['total_faces'] += result['faces_detected'] + + image_similarities = [comp['similarity'] for comp in result['comparisons']] + all_similarities.extend(image_similarities) + + if image_similarities: + stats['max_similarity'] = max(stats['max_similarity'], max(image_similarities)) + stats['min_similarity'] = min(stats['min_similarity'], min(image_similarities)) + + matched_count = sum(1 for comp in result['comparisons'] if comp['is_match']) + stats['matched_faces'] += matched_count + + stats['results'].append({ + 'image_file': image_file, + 'faces_detected': result['faces_detected'], + 'matched_faces': matched_count, + 'max_similarity': max(image_similarities) if image_similarities else 0, + 'min_similarity': min(image_similarities) if image_similarities else 0 + }) + + print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, " + f"匹配 {matched_count} 张, 最高相似度: {max(image_similarities) if image_similarities else 0:.3f}") + + except Exception as e: + print(f"处理图像 {image_file} 时出错: {e}") + + # 计算平均相似度 + if all_similarities: + stats['avg_similarity'] = sum(all_similarities) / len(all_similarities) + + # 保存处理统计 + stats_path = os.path.join(output_dir, "comparison_stats.json") + with open(stats_path, 'w', encoding='utf-8') as f: + json.dump(stats, f, indent=2, ensure_ascii=False) + + print(f"批量比对完成! 统计信息已保存: {stats_path}") + return stats + + def set_similarity_threshold(self, threshold: float): + """设置相似度阈值""" + if 0 <= threshold <= 1: + self.similarity_threshold = threshold + print(f"相似度阈值已设置为: {threshold}") + else: + print("阈值必须在 0 到 1 之间") + + def get_target_info(self) -> Dict: + """获取目标人脸信息""" + if self.target_embedding is None: + return {"status": "未设置目标人脸"} + + return { + "status": "已设置", + "person_id": self.target_person_id, + "image_path": self.target_image_path, + "similarity_threshold": self.similarity_threshold + } + + +# 使用示例和演示 +def demo_usage(): + """演示使用方法""" + + # 创建人脸比对系统 + face_system = SingleFaceComparisonSystem() + + # 1. 设置目标人脸 + print("=== 设置目标人脸 ===") + + # 创建测试数据目录 + os.makedirs("test_data/target", exist_ok=True) + os.makedirs("test_data/query", exist_ok=True) + os.makedirs("test_data/output", exist_ok=True) + + target_image = "test_data/register/person2.jpg" + # target_image = "test_data/register/person1.png" + + + # 设置目标人脸 + if os.path.exists(target_image): + face_system.set_target_face(target_image, "target_person") + else: + print(f"目标图像不存在: {target_image}") + print("请将目标人脸图像放在 test_data/target/ 目录下") + return + + # 2. 查看目标信息 + print("\n=== 目标人脸信息 ===") + target_info = face_system.get_target_info() + print(f"目标人员ID: {target_info['person_id']}") + print(f"目标图像路径: {target_info['image_path']}") + print(f"相似度阈值: {target_info['similarity_threshold']}") + + # 3. 单张图像比对示例 + print("\n=== 单张图像比对 ===") + query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像 + + if os.path.exists(query_image): + result = face_system.process_image_with_target_comparison( + image_path=query_image, + output_path="test_data/output/compared_multi_face.jpg", + draw_landmarks=True, + draw_info=True + ) + + print(f"检测到 {result['faces_detected']} 张人脸:") + for face in result['comparisons']: + status = "匹配" if face['is_match'] else "不匹配" + color_name = "绿色" if face['similarity'] >= 0.8 else \ + "黄色" if face['similarity'] >= 0.6 else \ + "橙色" if face['similarity'] >= 0.4 else "红色" + + print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} " + f"({color_name}, {status}), {face['gender']}/{face['age']}岁") + else: + print(f"查询图像不存在: {query_image}") + print("请将包含多人脸的图像放在 test_data/query/ 目录下") + + # 4. 批量处理示例 + print("\n=== 批量比对图像 ===") + if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0: + stats = face_system.batch_process_with_target( + input_dir="test_data/query", + output_dir="test_data/output/batch_comparison" + ) + + print(f"批量比对统计:") + print(f" 目标人员: {stats['target_person_id']}") + print(f" 总图像数: {stats['total_images']}") + print(f" 成功处理: {stats['processed_images']}") + print(f" 总人脸数: {stats['total_faces']}") + print(f" 匹配人脸: {stats['matched_faces']}") + print(f" 最高相似度: {stats['max_similarity']:.3f}") + print(f" 最低相似度: {stats['min_similarity']:.3f}") + print(f" 平均相似度: {stats['avg_similarity']:.3f}") + + # 5. 调整相似度阈值 + print("\n=== 调整阈值 ===") + face_system.set_similarity_threshold(0.25) # 提高阈值,更严格 + + +# 高级使用示例 +def advanced_usage(): + """高级使用示例""" + face_system = SingleFaceComparisonSystem() + + # 设置目标人脸 + face_system.set_target_face("path/to/target_face.jpg", "suspect_A") + + # 设置更严格的阈值 + face_system.set_similarity_threshold(0.75) + + # 处理单个图像并保存结果 + result = face_system.process_image_with_target_comparison( + image_path="path/to/surveillance_image.jpg", + output_path="path/to/analyzed_image.jpg" + ) + + # 输出详细信息 + print(f"分析完成: {result['image_path']}") + print(f"目标人员: {result['target_person_id']}") + print(f"检测到人脸: {result['faces_detected']}") + + for comp in result['comparisons']: + print(f"人脸 {comp['face_index'] + 1}: 相似度={comp['similarity']:.3f}, " + f"匹配={comp['is_match']}, 位置={comp['bbox']}") + + +if __name__ == "__main__": + demo_usage() \ No newline at end of file