完成初步的人脸特征值获取与比对,完成结果显示

This commit is contained in:
zqc
2025-11-10 15:24:49 +08:00
parent 280656e341
commit ecfeb1fd6c
6 changed files with 1868 additions and 0 deletions

242
src/cuda_t.py Normal file
View File

@@ -0,0 +1,242 @@
import torch
import onnxruntime as ort
import insightface
import subprocess
import sys
import os
def detailed_diagnosis():
"""详细诊断脚本"""
print("=" * 60)
print("详细CUDA诊断")
print("=" * 60)
# 1. 系统信息
print("\n📋 1. 系统信息:")
print(f"Python版本: {sys.version}")
print(f"Python路径: {sys.executable}")
print(f"Conda环境: {sys.prefix}")
# 2. PyTorch详细信息
print("\n🔥 2. PyTorch详细信息:")
print(f"PyTorch版本: {torch.__version__}")
print(f"PyTorch路径: {torch.__file__}")
print(f"CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA版本: {torch.version.cuda}")
print(f"GPU数量: {torch.cuda.device_count()}")
for i in range(torch.cuda.device_count()):
print(f" GPU {i}: {torch.cuda.get_device_name(i)}")
print(f" 内存: {torch.cuda.get_device_properties(i).total_memory / 1024 ** 3:.1f} GB")
else:
print("❌ PyTorch无法使用CUDA")
# 检查可能的原因
print("\n🔍 PyTorch CUDA问题排查:")
print(f" torch.cuda.is_available(): {torch.cuda.is_available()}")
try:
print(f" CUDA设备数量: {torch.cuda.device_count()}")
except:
print(" CUDA设备数量: 无法获取")
# 3. ONNX Runtime详细信息
print("\n⚡ 3. ONNX Runtime详细信息:")
print(f"ONNX Runtime版本: {ort.__version__}")
print(f"ONNX Runtime路径: {ort.__file__}")
available_providers = ort.get_available_providers()
print(f"可用Providers: {available_providers}")
if 'CUDAExecutionProvider' in available_providers:
print("✅ CUDAExecutionProvider可用")
# 测试CUDA provider
try:
options = ort.SessionOptions()
session = ort.InferenceSession(
os.path.join(os.path.dirname(insightface.__file__), 'models', 'buffalo_l', '1k3d68.onx'),
providers=['CUDAExecutionProvider'],
sess_options=options
)
print("✅ CUDAExecutionProvider测试通过")
except Exception as e:
print(f"❌ CUDAExecutionProvider测试失败: {e}")
else:
print("❌ CUDAExecutionProvider不可用")
# 4. InsightFace信息
print("\n👁️ 4. InsightFace信息:")
print(f"InsightFace版本: {insightface.__version__}")
print(f"InsightFace路径: {insightface.__file__}")
# 5. 系统CUDA检查
print("\n🖥️ 5. 系统CUDA检查:")
try:
# 检查nvidia-smi
result = subprocess.run(['nvidia-smi'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
print("✅ nvidia-smi可用")
# 提取关键信息
lines = result.stdout.split('\n')
for line in lines:
if 'Driver Version' in line:
print(f" 驱动版本: {line.strip()}")
if 'CUDA Version' in line:
print(f" CUDA版本: {line.strip()}")
else:
print("❌ nvidia-smi不可用")
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
print(f"❌ nvidia-smi执行失败: {e}")
try:
# 检查nvcc
result = subprocess.run(['nvcc', '--version'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✅ nvcc可用")
version_line = result.stdout.split('\n')[3] if len(result.stdout.split('\n')) > 3 else result.stdout
print(f" {version_line.strip()}")
else:
print("❌ nvcc不可用")
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
print(f"❌ nvcc执行失败: {e}")
# 6. 环境变量检查
print("\n🌍 6. 环境变量检查:")
cuda_paths = []
for key, value in os.environ.items():
if 'CUDA' in key.upper() or 'CUDNN' in key.upper():
print(f" {key}: {value}")
if 'PATH' in key or 'HOME' in key:
cuda_paths.append((key, value))
# 7. 包版本兼容性检查
print("\n📦 7. 包版本兼容性检查:")
try:
import pkg_resources
packages = ['torch', 'torchvision', 'torchaudio', 'onnxruntime', 'insightface', 'opencv-python', 'numpy']
for pkg in packages:
try:
version = pkg_resources.get_distribution(pkg).version
print(f" {pkg}: {version}")
except:
print(f" {pkg}: 未安装")
except:
print(" 无法检查包版本")
# 8. 实际性能测试
print("\n🚀 8. 实际性能测试:")
try:
# 测试InsightFace实际使用
app_cpu = insightface.app.FaceAnalysis(name='buffalo_l')
app_cpu.prepare(ctx_id=-1) # 强制CPU
app_gpu = insightface.app.FaceAnalysis(name='buffalo_l')
app_gpu.prepare(ctx_id=0) # 强制GPU
# 创建测试图像
import numpy as np
import cv2
test_img = np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
# CPU测试
import time
print(" CPU测试...")
start_time = time.time()
for _ in range(5):
faces_cpu = app_cpu.get(test_img)
cpu_time = (time.time() - start_time) * 1000 / 5
# GPU测试
print(" GPU测试...")
start_time = time.time()
for _ in range(5):
faces_gpu = app_gpu.get(test_img)
gpu_time = (time.time() - start_time) * 1000 / 5
print(f" CPU平均时间: {cpu_time:.1f}ms")
print(f" GPU平均时间: {gpu_time:.1f}ms")
if gpu_time < cpu_time * 0.8: # GPU应该比CPU快
print(" ✅ GPU加速生效")
else:
print(" ⚠️ GPU加速未生效或效果不明显")
except Exception as e:
print(f" ❌ 性能测试失败: {e}")
def check_package_installation():
"""检查包安装情况"""
print("\n" + "=" * 60)
print("包安装检查")
print("=" * 60)
packages = {
'torch': 'PyTorch (深度学习框架)',
'torchvision': 'PyTorch视觉库',
'torchaudio': 'PyTorch音频库',
'onnxruntime': 'ONNX Runtime (推理引擎)',
'insightface': '人脸识别库',
'opencv-python': 'OpenCV (图像处理)',
'numpy': '数值计算库'
}
for pkg, desc in packages.items():
try:
if pkg == 'torch':
import torch
version = torch.__version__
cuda_status = "✅ CUDA可用" if torch.cuda.is_available() else "❌ CUDA不可用"
print(f"{pkg} ({desc}): {version} {cuda_status}")
elif pkg == 'onnxruntime':
import onnxruntime as ort
version = ort.__version__
providers = ort.get_available_providers()
cuda_status = "✅ 有GPU支持" if 'CUDAExecutionProvider' in providers else "❌ 无GPU支持"
print(f"{pkg} ({desc}): {version} {cuda_status}")
else:
module = __import__(pkg)
version = getattr(module, '__version__', '未知版本')
print(f"{pkg} ({desc}): {version}")
except ImportError:
print(f"{pkg} ({desc}): ❌ 未安装")
if __name__ == "__main__":
detailed_diagnosis()
check_package_installation()
# 提供解决方案
print("\n" + "=" * 60)
print("解决方案建议")
print("=" * 60)
# 基于诊断结果给出建议
if not torch.cuda.is_available():
print("\n❌ 主要问题: PyTorch没有CUDA支持")
print("💡 解决方案:")
print("1. 完全卸载当前PyTorch:")
print(" pip uninstall torch torchvision torchaudio")
print("2. 安装GPU版本的PyTorch:")
print(" pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")
if 'CUDAExecutionProvider' not in ort.get_available_providers():
print("\n❌ 主要问题: ONNX Runtime没有GPU支持")
print("💡 解决方案:")
print("1. 卸载CPU版本:")
print(" pip uninstall onnxruntime")
print("2. 安装GPU版本:")
print(" pip install onnxruntime-gpu")
# 通用建议
print("\n🔄 通用建议:")
print("1. 创建全新的conda环境:")
print(" conda create -n face_gpu python=3.10")
print(" conda activate face_gpu")
print("2. 按顺序安装:")
print(" pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")
print(" pip install onnxruntime-gpu")
print(" pip install insightface opencv-python")
print("3. 验证安装:")
print(
" python -c \"import torch; print(f'PyTorch CUDA: {torch.cuda.is_available()}'); import onnxruntime as ort; print(f'ONNX Providers: {ort.get_available_providers()}')\"")

200
src/cuda_t2.py Normal file
View File

@@ -0,0 +1,200 @@
# final_gpu_test.py
import torch
import onnxruntime as ort
import insightface
import cv2
import numpy as np
import time
import os
from typing import List, Dict
def comprehensive_gpu_test():
"""全面的GPU测试"""
print("=" * 60)
print("最终GPU加速测试")
print("=" * 60)
# 1. 环境验证
print("1. 环境验证:")
print(f"✅ PyTorch: {torch.__version__}")
print(f"✅ PyTorch CUDA: {torch.cuda.is_available()}")
print(f"✅ GPU设备: {torch.cuda.get_device_name(0)}")
print(f"✅ ONNX Runtime GPU支持: {'CUDAExecutionProvider' in ort.get_available_providers()}")
print(f"✅ OpenCV: {cv2.__version__}")
print(f"✅ InsightFace: {insightface.__version__}")
# 2. 创建测试数据
print("\n2. 创建测试数据...")
os.makedirs("test_data", exist_ok=True)
# 创建目标人脸
target_face = np.random.randint(0, 255, (400, 400, 3), dtype=np.uint8)
cv2.rectangle(target_face, (150, 150), (250, 250), (255, 255, 255), -1) # 简单的人脸模拟
cv2.imwrite("test_data/target_face.jpg", target_face)
# 创建测试图像(多个人脸)
test_image = np.random.randint(0, 255, (800, 600, 3), dtype=np.uint8)
# 添加多个人脸区域
cv2.rectangle(test_image, (100, 100), (200, 200), (255, 255, 255), -1)
cv2.rectangle(test_image, (300, 150), (400, 250), (255, 255, 255), -1)
cv2.rectangle(test_image, (500, 200), (600, 300), (255, 255, 255), -1)
cv2.imwrite("test_data/multi_face_test.jpg", test_image)
print("✅ 测试数据创建完成")
# 3. GPU加速的人脸识别系统
print("\n3. 初始化GPU人脸识别系统...")
class FastGPUFaceRecognition:
def __init__(self):
self.app = insightface.app.FaceAnalysis(name='buffalo_l')
self.app.prepare(
ctx_id=0, # GPU 0
det_thresh=0.3, # 检测阈值
det_size=(640, 640)
)
self.target_embedding = None
print("✅ GPU人脸识别系统初始化完成")
def set_target_face(self, image_path: str):
"""设置目标人脸"""
img = cv2.imread(image_path)
faces = self.app.get(img)
if faces:
self.target_embedding = faces[0].embedding
print(f"✅ 目标人脸特征提取完成")
return True
return False
def process_image(self, image_path: str, output_path: str):
"""处理图像并返回结果"""
start_time = time.time()
img = cv2.imread(image_path)
if img is None:
return None
# GPU推理
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
similarity = 0.0
if self.target_embedding is not None:
# 计算相似度
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
results.append({
'face_index': i,
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'det_score': face.det_score
})
# 绘制结果
bbox = face.bbox.astype(int)
color = (0, 255, 0) if similarity > 0.6 else (0, 0, 255)
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
cv2.putText(img, f"{similarity:.3f}", (bbox[0], bbox[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 保存结果
cv2.imwrite(output_path, img)
processing_time = (time.time() - start_time) * 1000
return {
'faces_detected': len(faces),
'processing_time_ms': processing_time,
'results': results
}
# 初始化系统
system = FastGPUFaceRecognition()
# 4. 性能测试
print("\n4. 性能测试:")
# 设置目标人脸
if system.set_target_face("test_data/target_face.jpg"):
# 处理测试图像
result = system.process_image("test_data/multi_face_test.jpg", "test_data/gpu_result.jpg")
if result:
print(f"✅ 处理完成:")
print(f" 检测到人脸数: {result['faces_detected']}")
print(f" 处理时间: {result['processing_time_ms']:.1f}ms")
for face in result['results']:
status = "匹配" if face['similarity'] > 0.6 else "不匹配"
print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} ({status})")
# 5. 批量性能测试
print("\n5. 批量性能测试:")
# 创建多个测试图像
test_images = []
for i in range(5):
test_img = np.random.randint(0, 255, (600, 800, 3), dtype=np.uint8)
cv2.rectangle(test_img, (100, 100), (200, 200), (255, 255, 255), -1)
img_path = f"test_data/batch_test_{i}.jpg"
cv2.imwrite(img_path, test_img)
test_images.append(img_path)
total_time = 0
total_faces = 0
for i, img_path in enumerate(test_images):
result = system.process_image(img_path, f"test_data/batch_result_{i}.jpg")
if result:
total_time += result['processing_time_ms']
total_faces += result['faces_detected']
print(f" 图像 {i + 1}: {result['processing_time_ms']:.1f}ms, {result['faces_detected']}张人脸")
if len(test_images) > 0:
avg_time = total_time / len(test_images)
print(f" 平均处理时间: {avg_time:.1f}ms/张")
print(f" 总检测人脸: {total_faces}")
# 6. 与CPU性能对比可选
print("\n6. GPU vs CPU 性能对比:")
try:
# GPU测试
gpu_times = []
test_img = cv2.imread("test_data/multi_face_test.jpg")
for _ in range(10):
start = time.time()
faces = system.app.get(test_img)
gpu_times.append((time.time() - start) * 1000)
# CPU测试创建新的CPU实例
cpu_app = insightface.app.FaceAnalysis(name='buffalo_l')
cpu_app.prepare(ctx_id=-1) # CPU
cpu_times = []
for _ in range(10):
start = time.time()
faces = cpu_app.get(test_img)
cpu_times.append((time.time() - start) * 1000)
avg_gpu = np.mean(gpu_times)
avg_cpu = np.mean(cpu_times)
print(f" GPU平均推理时间: {avg_gpu:.1f}ms")
print(f" CPU平均推理时间: {avg_cpu:.1f}ms")
print(f" GPU加速比: {avg_cpu / avg_gpu:.1f}x")
except Exception as e:
print(f" 性能对比测试跳过: {e}")
print("\n" + "=" * 60)
print("🎉 GPU加速测试完成")
print("✅ 现在您的人脸识别系统正在使用GPU加速")
print("📁 结果图像保存在 test_data/ 目录中")
print("=" * 60)
if __name__ == "__main__":
comprehensive_gpu_test()

200
src/cuda_t3.py Normal file
View File

@@ -0,0 +1,200 @@
# final_gpu_test.py
import torch
import onnxruntime as ort
import insightface
import cv2
import numpy as np
import time
import os
from typing import List, Dict
def comprehensive_gpu_test():
"""全面的GPU测试"""
print("=" * 60)
print("最终GPU加速测试")
print("=" * 60)
# 1. 环境验证
print("1. 环境验证:")
print(f"✅ PyTorch: {torch.__version__}")
print(f"✅ PyTorch CUDA: {torch.cuda.is_available()}")
print(f"✅ GPU设备: {torch.cuda.get_device_name(0)}")
print(f"✅ ONNX Runtime GPU支持: {'CUDAExecutionProvider' in ort.get_available_providers()}")
print(f"✅ OpenCV: {cv2.__version__}")
print(f"✅ InsightFace: {insightface.__version__}")
# 2. 创建测试数据
print("\n2. 创建测试数据...")
os.makedirs("test_data", exist_ok=True)
# 创建目标人脸
target_face = np.random.randint(0, 255, (400, 400, 3), dtype=np.uint8)
cv2.rectangle(target_face, (150, 150), (250, 250), (255, 255, 255), -1) # 简单的人脸模拟
cv2.imwrite("test_data/target_face.jpg", target_face)
# 创建测试图像(多个人脸)
test_image = np.random.randint(0, 255, (800, 600, 3), dtype=np.uint8)
# 添加多个人脸区域
cv2.rectangle(test_image, (100, 100), (200, 200), (255, 255, 255), -1)
cv2.rectangle(test_image, (300, 150), (400, 250), (255, 255, 255), -1)
cv2.rectangle(test_image, (500, 200), (600, 300), (255, 255, 255), -1)
cv2.imwrite("test_data/multi_face_test.jpg", test_image)
print("✅ 测试数据创建完成")
# 3. GPU加速的人脸识别系统
print("\n3. 初始化GPU人脸识别系统...")
class FastGPUFaceRecognition:
def __init__(self):
self.app = insightface.app.FaceAnalysis(name='buffalo_l')
self.app.prepare(
ctx_id=0, # GPU 0
det_thresh=0.3, # 检测阈值
det_size=(640, 640)
)
self.target_embedding = None
print("✅ GPU人脸识别系统初始化完成")
def set_target_face(self, image_path: str):
"""设置目标人脸"""
img = cv2.imread(image_path)
faces = self.app.get(img)
if faces:
self.target_embedding = faces[0].embedding
print(f"✅ 目标人脸特征提取完成")
return True
return False
def process_image(self, image_path: str, output_path: str):
"""处理图像并返回结果"""
start_time = time.time()
img = cv2.imread(image_path)
if img is None:
return None
# GPU推理
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
similarity = 0.0
if self.target_embedding is not None:
# 计算相似度
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
results.append({
'face_index': i,
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'det_score': face.det_score
})
# 绘制结果
bbox = face.bbox.astype(int)
color = (0, 255, 0) if similarity > 0.6 else (0, 0, 255)
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
cv2.putText(img, f"{similarity:.3f}", (bbox[0], bbox[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 保存结果
cv2.imwrite(output_path, img)
processing_time = (time.time() - start_time) * 1000
return {
'faces_detected': len(faces),
'processing_time_ms': processing_time,
'results': results
}
# 初始化系统
system = FastGPUFaceRecognition()
# 4. 性能测试
print("\n4. 性能测试:")
# 设置目标人脸
if system.set_target_face("test_data/target_face.jpg"):
# 处理测试图像
result = system.process_image("test_data/multi_face_test.jpg", "test_data/gpu_result.jpg")
if result:
print(f"✅ 处理完成:")
print(f" 检测到人脸数: {result['faces_detected']}")
print(f" 处理时间: {result['processing_time_ms']:.1f}ms")
for face in result['results']:
status = "匹配" if face['similarity'] > 0.6 else "不匹配"
print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} ({status})")
# 5. 批量性能测试
print("\n5. 批量性能测试:")
# 创建多个测试图像
test_images = []
for i in range(5):
test_img = np.random.randint(0, 255, (600, 800, 3), dtype=np.uint8)
cv2.rectangle(test_img, (100, 100), (200, 200), (255, 255, 255), -1)
img_path = f"test_data/batch_test_{i}.jpg"
cv2.imwrite(img_path, test_img)
test_images.append(img_path)
total_time = 0
total_faces = 0
for i, img_path in enumerate(test_images):
result = system.process_image(img_path, f"test_data/batch_result_{i}.jpg")
if result:
total_time += result['processing_time_ms']
total_faces += result['faces_detected']
print(f" 图像 {i + 1}: {result['processing_time_ms']:.1f}ms, {result['faces_detected']}张人脸")
if len(test_images) > 0:
avg_time = total_time / len(test_images)
print(f" 平均处理时间: {avg_time:.1f}ms/张")
print(f" 总检测人脸: {total_faces}")
# 6. 与CPU性能对比可选
print("\n6. GPU vs CPU 性能对比:")
try:
# GPU测试
gpu_times = []
test_img = cv2.imread("test_data/multi_face_test.jpg")
for _ in range(10):
start = time.time()
faces = system.app.get(test_img)
gpu_times.append((time.time() - start) * 1000)
# CPU测试创建新的CPU实例
cpu_app = insightface.app.FaceAnalysis(name='buffalo_l')
cpu_app.prepare(ctx_id=-1) # CPU
cpu_times = []
for _ in range(10):
start = time.time()
faces = cpu_app.get(test_img)
cpu_times.append((time.time() - start) * 1000)
avg_gpu = np.mean(gpu_times)
avg_cpu = np.mean(cpu_times)
print(f" GPU平均推理时间: {avg_gpu:.1f}ms")
print(f" CPU平均推理时间: {avg_cpu:.1f}ms")
print(f" GPU加速比: {avg_cpu / avg_gpu:.1f}x")
except Exception as e:
print(f" 性能对比测试跳过: {e}")
print("\n" + "=" * 60)
print("🎉 GPU加速测试完成")
print("✅ 现在您的人脸识别系统正在使用GPU加速")
print("📁 结果图像保存在 test_data/ 目录中")
print("=" * 60)
if __name__ == "__main__":
comprehensive_gpu_test()

View File

@@ -0,0 +1,249 @@
import os
import cv2
import numpy as np
from insightface.app import FaceAnalysis
from insightface.data import get_image as ins_get_image
import pickle
from typing import List, Tuple, Dict
import json
class FaceRecognitionSystem:
def __init__(self, model_name: str = 'buffalo_l'):
"""
初始化人脸识别系统
Args:
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s'
"""
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.face_database = {} # 存储人脸特征向量
self.database_file = "face_database.pkl"
# 加载已有数据库
self.load_database()
def extract_face_features(self, image_path: str) -> List[Dict]:
"""
从图像中提取人脸特征
Args:
image_path: 图像路径
Returns:
List[Dict]: 包含人脸信息的列表
"""
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
face_info = {
'bbox': face.bbox.astype(int).tolist(), # 人脸框
'kps': face.kps.astype(int).tolist(), # 关键点
'embedding': face.embedding.tolist(), # 特征向量
'gender': face.gender, # 性别
'age': face.age # 年龄
}
results.append(face_info)
return results
def register_face(self, image_path: str, person_id: str):
"""
注册人脸到数据库
Args:
image_path: 图像路径
person_id: 人员ID
"""
faces = self.extract_face_features(image_path)
if not faces:
print(f"在图像 {image_path} 中未检测到人脸")
return False
if len(faces) > 1:
print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
# 存储第一个人脸的特征
self.face_database[person_id] = {
'embedding': np.array(faces[0]['embedding']),
'image_path': image_path
}
self.save_database()
print(f"成功注册人脸: {person_id}")
return True
def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""
计算两个人脸特征的相似度
Args:
embedding1: 特征向量1
embedding2: 特征向量2
Returns:
float: 相似度得分 (0-1之间越大越相似)
"""
# 计算余弦相似度
similarity = np.dot(embedding1, embedding2) / (
np.linalg.norm(embedding1) * np.linalg.norm(embedding2)
)
return float(similarity)
def one_vs_one(self, image_path1: str, image_path2: str) -> Tuple[float, bool]:
"""
1v1人脸比对
Args:
image_path1: 图像1路径
image_path2: 图像2路径
Returns:
Tuple[float, bool]: (相似度得分, 是否同一人)
"""
faces1 = self.extract_face_features(image_path1)
faces2 = self.extract_face_features(image_path2)
if not faces1 or not faces2:
return 0.0, False
embedding1 = np.array(faces1[0]['embedding'])
embedding2 = np.array(faces2[0]['embedding'])
similarity = self.compare_faces(embedding1, embedding2)
is_same = similarity > 0.6 # 阈值可根据实际情况调整
return similarity, is_same
def one_vs_many(self, image_path: str, threshold: float = 0.6) -> List[Tuple[str, float]]:
"""
1vn人脸检索
Args:
image_path: 查询图像路径
threshold: 相似度阈值
Returns:
List[Tuple[str, float]]: 匹配结果 (人员ID, 相似度)
"""
faces = self.extract_face_features(image_path)
if not faces:
return []
query_embedding = np.array(faces[0]['embedding'])
results = []
for person_id, data in self.face_database.items():
similarity = self.compare_faces(query_embedding, data['embedding'])
if similarity > threshold:
results.append((person_id, similarity))
# 按相似度降序排序
results.sort(key=lambda x: x[1], reverse=True)
return results
def save_database(self):
"""保存人脸数据库到文件"""
# 将numpy数组转换为列表以便序列化
save_data = {}
for person_id, data in self.face_database.items():
save_data[person_id] = {
'embedding': data['embedding'].tolist(),
'image_path': data['image_path']
}
with open(self.database_file, 'wb') as f:
pickle.dump(save_data, f)
def load_database(self):
"""从文件加载人脸数据库"""
if os.path.exists(self.database_file):
with open(self.database_file, 'rb') as f:
save_data = pickle.load(f)
# 将列表转换回numpy数组
for person_id, data in save_data.items():
self.face_database[person_id] = {
'embedding': np.array(data['embedding']),
'image_path': data['image_path']
}
print(f"已加载数据库,包含 {len(self.face_database)} 个人脸")
def visualize_detection(self, image_path: str, save_path: str = None):
"""
可视化人脸检测结果
Args:
image_path: 图像路径
save_path: 保存路径
"""
img = cv2.imread(image_path)
faces = self.app.get(img)
for face in faces:
# 绘制人脸框
bbox = face.bbox.astype(int)
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
# 绘制关键点
for kp in face.kps.astype(int):
cv2.circle(img, (kp[0], kp[1]), 2, (0, 0, 255), -1)
# 显示性别和年龄
info = f"{'M' if face.gender == 1 else 'F'}/{face.age}"
cv2.putText(img, info, (bbox[0], bbox[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
if save_path:
cv2.imwrite(save_path, img)
return img
# 使用示例
def main():
# 初始化系统
face_system = FaceRecognitionSystem()
# # 创建测试图像目录
# os.makedirs("test_images", exist_ok=True)
#
# # 示例1: 注册人脸
# print("=== 注册人脸 ===")
# # 假设你有一些人脸图像放在 test_images 目录下
# test_images = {
# "person_001": "test_images/person1.png",
# "person_002": "test_images/person2.jpg",
# # "person_003": "test_images/person3.jpg"
# }
#
# for person_id, img_path in test_images.items():
# if os.path.exists(img_path):
# face_system.register_face(img_path, person_id)
# 示例2: 1v1比对
print("\n=== 1v1人脸比对 ===")
img1 = "test_data/register/person1.png"
# img1 = "test_data/query/file___media_Photo_103_IMG_1737872809_085_IMG_20250126_142509.jpg"
img2 = "test_data/query/file___media_Photo_152_IMG_1737876072_134_IMG_20250126_151932.jpg"
if os.path.exists(img1) and os.path.exists(img2):
similarity, is_same = face_system.one_vs_one(img1, img2)
print(f"相似度: {similarity:.4f}, 是否同一人: {is_same}")
# # 示例3: 1vn检索
# print("\n=== 1vn人脸检索 ===")
# query_img = "test_images/query.jpg"
# if os.path.exists(query_img):
# results = face_system.one_vs_many(query_img)
# print("检索结果:")
# for person_id, score in results:
# print(f" {person_id}: {score:.4f}")
#
# # 示例4: 可视化检测结果
# print("\n=== 人脸检测可视化 ===")
# test_img = "test_images/test.jpg"
# if os.path.exists(test_img):
# output_img = face_system.visualize_detection(test_img, "output/detection_result.jpg")
# print("检测结果已保存到 output/detection_result.jpg")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,496 @@
import os
import cv2
import numpy as np
from insightface.app import FaceAnalysis
import pickle
from typing import List, Tuple, Dict, Optional
from datetime import datetime
import json
class MultiFaceRecognitionSystem:
"""
多人脸识别系统
支持多个人脸检测、识别和标注
"""
def __init__(self, model_name: str = 'buffalo_l'):
"""
初始化人脸识别系统
Args:
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s'
"""
# 初始化InsightFace
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
self.app.prepare(ctx_id=0, det_size=(640, 640))
# 人脸数据库
self.face_database = {}
self.database_file = "face_database.pkl"
# 加载已有数据库
self.load_database()
# 相似度阈值
self.similarity_threshold = 0.6
# 颜色配置
self.colors = {
'known': (0, 255, 0), # 绿色 - 已知人脸
'unknown': (0, 0, 255), # 红色 - 未知人脸
'text': (255, 255, 255), # 白色 - 文字
'landmark': (255, 0, 0) # 蓝色 - 关键点
}
def register_face(self, image_path: str, person_id: str) -> bool:
"""
注册单个人脸到数据库
Args:
image_path: 图像路径
person_id: 人员ID
Returns:
bool: 注册是否成功
"""
if not os.path.exists(image_path):
print(f"图像文件不存在: {image_path}")
return False
# 提取人脸特征
faces = self.extract_face_features(image_path)
if not faces:
print(f"在图像 {image_path} 中未检测到人脸")
return False
if len(faces) > 1:
print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
# 存储到数据库
self.face_database[person_id] = {
'embedding': np.array(faces[0]['embedding']),
'image_path': image_path,
'registration_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 保存数据库
self.save_database()
print(f"成功注册人脸: {person_id}")
return True
def batch_register_faces(self, image_dir: str, id_prefix: str = "person") -> int:
"""
批量注册人脸
Args:
image_dir: 图像目录
id_prefix: ID前缀
Returns:
int: 成功注册的数量
"""
if not os.path.exists(image_dir):
print(f"目录不存在: {image_dir}")
return 0
registered_count = 0
image_files = [f for f in os.listdir(image_dir)
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
for i, filename in enumerate(image_files):
image_path = os.path.join(image_dir, filename)
person_id = f"{id_prefix}_{i + 1:03d}"
if self.register_face(image_path, person_id):
registered_count += 1
print(f"批量注册完成: {registered_count}/{len(image_files)} 个人脸")
return registered_count
def extract_face_features(self, image_path: str) -> List[Dict]:
"""
从图像中提取所有人脸特征
Args:
image_path: 图像路径
Returns:
List[Dict]: 包含多个人脸信息的列表
"""
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
face_info = {
'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2]
'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...]
'embedding': face.embedding.tolist(), # 特征向量
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score) # 检测置信度
}
results.append(face_info)
return results
def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""
计算两个人脸特征的相似度
Args:
embedding1: 特征向量1
embedding2: 特征向量2
Returns:
float: 余弦相似度 (0-1)
"""
# 归一化向量
embedding1 = embedding1 / np.linalg.norm(embedding1)
embedding2 = embedding2 / np.linalg.norm(embedding2)
# 计算余弦相似度
similarity = np.dot(embedding1, embedding2)
return float(similarity)
def search_face(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
"""
在数据库中搜索最相似的人脸
Args:
query_embedding: 查询特征向量
top_k: 返回最相似的k个结果
Returns:
List[Tuple[str, float]]: (人员ID, 相似度)
"""
if not self.face_database:
return []
results = []
query_embedding = query_embedding / np.linalg.norm(query_embedding)
for person_id, data in self.face_database.items():
db_embedding = data['embedding'] / np.linalg.norm(data['embedding'])
similarity = np.dot(query_embedding, db_embedding)
if similarity > self.similarity_threshold:
results.append((person_id, float(similarity)))
# 按相似度降序排序
results.sort(key=lambda x: x[1], reverse=True)
# 返回top_k个结果
return results[:top_k]
def recognize_faces_in_image(self, image_path: str, output_path: str = None,
draw_landmarks: bool = True, draw_info: bool = True) -> Dict:
"""
识别图像中的所有脸并进行标注
Args:
image_path: 输入图像路径
output_path: 输出图像路径
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
Returns:
Dict: 识别结果
"""
# 读取图像
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
# 提取人脸特征
faces = self.extract_face_features(image_path)
recognition_results = {
'image_path': image_path,
'faces_detected': len(faces),
'recognitions': []
}
# 对每个检测到的人脸进行识别
for i, face in enumerate(faces):
# 在数据库中搜索
query_embedding = np.array(face['embedding'])
search_results = self.search_face(query_embedding, top_k=1)
# 识别结果
person_id = "Unknown"
similarity = 0.0
if search_results:
person_id, similarity = search_results[0]
# 存储识别结果
face_recognition = {
'face_index': i,
'bbox': face['bbox'],
'person_id': person_id,
'similarity': similarity,
'gender': face['gender'],
'age': face['age'],
'det_score': face['det_score']
}
recognition_results['recognitions'].append(face_recognition)
# 在图像上绘制标注
self._draw_face_annotation(img, face_recognition, draw_landmarks, draw_info)
# 保存输出图像
if output_path:
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
cv2.imwrite(output_path, img)
print(f"标注图像已保存: {output_path}")
return recognition_results
def _draw_face_annotation(self, img: np.ndarray, face_info: Dict,
draw_landmarks: bool, draw_info: bool):
"""
在图像上绘制人脸标注
Args:
img: 图像数组
face_info: 人脸信息
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
"""
bbox = face_info['bbox'] # [x1, y1, x2, y2]
person_id = face_info['person_id']
similarity = face_info['similarity']
# 确定颜色:已知人脸用绿色,未知用红色
if person_id != "Unknown":
color = self.colors['known']
label = f"{person_id} ({similarity:.3f})"
else:
color = self.colors['unknown']
label = "Unknown"
# 绘制人脸框
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
# 绘制关键点
if draw_landmarks and 'kps' in face_info:
for kp in face_info['kps']:
cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1)
# 绘制信息文本
if draw_info:
# 基础信息
text_y = bbox[1] - 10
if text_y < 20:
text_y = bbox[3] + 20
cv2.putText(img, label, (bbox[0], text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2)
# 详细信息
detail_text = f"{face_info['gender']}/{face_info['age']}"
cv2.putText(img, detail_text, (bbox[0], text_y + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1)
def batch_process_images(self, input_dir: str, output_dir: str,
image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict:
"""
批量处理目录中的所有图像
Args:
input_dir: 输入图像目录
output_dir: 输出图像目录
image_extensions: 图像文件扩展名
Returns:
Dict: 批量处理结果统计
"""
if not os.path.exists(input_dir):
raise ValueError(f"输入目录不存在: {input_dir}")
os.makedirs(output_dir, exist_ok=True)
# 获取所有图像文件
image_files = []
for ext in image_extensions:
image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)])
print(f"找到 {len(image_files)} 个图像文件")
# 处理统计
stats = {
'total_images': len(image_files),
'processed_images': 0,
'total_faces': 0,
'recognized_faces': 0,
'results': []
}
# 批量处理
for image_file in image_files:
input_path = os.path.join(input_dir, image_file)
output_path = os.path.join(output_dir, f"annotated_{image_file}")
try:
# 处理单张图像
result = self.recognize_faces_in_image(input_path, output_path)
# 更新统计
stats['processed_images'] += 1
stats['total_faces'] += result['faces_detected']
recognized = sum(1 for face in result['recognitions'] if face['person_id'] != "Unknown")
stats['recognized_faces'] += recognized
stats['results'].append({
'image_file': image_file,
'faces_detected': result['faces_detected'],
'recognized_faces': recognized
})
print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, 识别出 {recognized}")
except Exception as e:
print(f"处理图像 {image_file} 时出错: {e}")
# 保存处理统计
stats_path = os.path.join(output_dir, "processing_stats.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"批量处理完成! 统计信息已保存: {stats_path}")
return stats
def save_database(self):
"""保存人脸数据库到文件"""
# 转换为可序列化的格式
save_data = {}
for person_id, data in self.face_database.items():
save_data[person_id] = {
'embedding': data['embedding'].tolist(),
'image_path': data['image_path'],
'registration_time': data.get('registration_time', 'Unknown')
}
with open(self.database_file, 'wb') as f:
pickle.dump(save_data, f)
print(f"人脸数据库已保存: {self.database_file} (共 {len(self.face_database)} 人)")
def load_database(self):
"""从文件加载人脸数据库"""
if os.path.exists(self.database_file):
try:
with open(self.database_file, 'rb') as f:
save_data = pickle.load(f)
# 转换回numpy数组
for person_id, data in save_data.items():
self.face_database[person_id] = {
'embedding': np.array(data['embedding']),
'image_path': data['image_path'],
'registration_time': data.get('registration_time', 'Unknown')
}
print(f"人脸数据库已加载: {len(self.face_database)}")
except Exception as e:
print(f"加载数据库失败: {e}")
self.face_database = {}
else:
print("数据库文件不存在,将创建新数据库")
self.face_database = {}
def get_database_info(self) -> Dict:
"""获取数据库信息"""
return {
'total_persons': len(self.face_database),
'person_ids': list(self.face_database.keys()),
'database_file': self.database_file
}
def set_similarity_threshold(self, threshold: float):
"""设置相似度阈值"""
if 0 <= threshold <= 1:
self.similarity_threshold = threshold
print(f"相似度阈值已设置为: {threshold}")
else:
print("阈值必须在 0 到 1 之间")
# 使用示例和演示
def demo_usage():
"""演示使用方法"""
# 创建人脸识别系统
face_system = MultiFaceRecognitionSystem()
# 1. 注册人脸到数据库
print("=== 注册人脸 ===")
# 创建测试数据目录
os.makedirs("test_data/register", exist_ok=True)
os.makedirs("test_data/query", exist_ok=True)
os.makedirs("test_data/output", exist_ok=True)
# 假设你有一些人脸图像用于注册
# face_system.register_face("test_data/register/person1.jpg", "alice")
# face_system.register_face("test_data/register/person2.jpg", "bob")
# face_system.register_face("test_data/register/person3.jpg", "charlie")
# # 或者批量注册
# face_system.batch_register_faces("test_data/register", "person")
# 2. 查看数据库信息
print("\n=== 数据库信息 ===")
db_info = face_system.get_database_info()
print(f"数据库人数: {db_info['total_persons']}")
print(f"人员ID: {db_info['person_ids']}")
# 3. 单张图像识别示例
print("\n=== 单张图像识别 ===")
query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像
if os.path.exists(query_image):
result = face_system.recognize_faces_in_image(
image_path=query_image,
output_path="test_data/output/annotated_multi_face.jpg",
draw_landmarks=True,
draw_info=True
)
print(f"检测到 {result['faces_detected']} 张人脸:")
for face in result['recognitions']:
status = "已知" if face['person_id'] != "Unknown" else "未知"
print(f" 人脸 {face['face_index'] + 1}: {face['person_id']} "
f"(相似度: {face['similarity']:.3f}, {face['gender']}/{face['age']}岁) - {status}")
else:
print(f"查询图像不存在: {query_image}")
print("请将包含多人脸的图像放在 test_data/query/ 目录下")
# 4. 批量处理示例
print("\n=== 批量处理图像 ===")
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
stats = face_system.batch_process_images(
input_dir="test_data/query",
output_dir="test_data/output/batch_results"
)
print(f"批量处理统计:")
print(f" 总图像数: {stats['total_images']}")
print(f" 成功处理: {stats['processed_images']}")
print(f" 总人脸数: {stats['total_faces']}")
print(f" 识别出的人脸: {stats['recognized_faces']}")
# 5. 调整相似度阈值
print("\n=== 调整阈值 ===")
face_system.set_similarity_threshold(0.1) # 降低阈值,更宽松
# face_system.set_similarity_threshold(0.7) # 提高阈值,更严格
if __name__ == "__main__":
demo_usage()

View File

@@ -0,0 +1,481 @@
import os
import cv2
import numpy as np
from insightface.app import FaceAnalysis
import pickle
from typing import List, Tuple, Dict, Optional
from datetime import datetime
import json
class SingleFaceComparisonSystem:
"""
单人脸比对系统
在图像中检测多个人脸,并与单个目标人脸进行比对
"""
def __init__(self, model_name: str = 'buffalo_l'):
"""
初始化人脸比对系统
Args:
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s'
"""
# 初始化InsightFace
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
self.app.prepare(ctx_id=0,
det_thresh=0.35, # 降低阈值,检测更多模糊/小脸
det_size=(640, 640))
# 目标人脸特征
self.target_embedding = None
self.target_person_id = None
self.target_image_path = None
# 相似度阈值
self.similarity_threshold = 0.25
# 颜色配置
self.colors = {
'match_high': (0, 255, 0), # 绿色 - 高相似度
'match_medium': (0, 255, 255), # 黄色 - 中等相似度
'match_low': (0, 165, 255), # 橙色 - 低相似度
'no_match': (0, 0, 255), # 红色 - 不匹配
'text': (255, 255, 255), # 白色 - 文字
'landmark': (255, 0, 0) # 蓝色 - 关键点
}
def set_target_face(self, image_path: str, person_id: str = "target_person") -> bool:
"""
设置目标人脸
Args:
image_path: 目标人脸图像路径
person_id: 目标人员ID
Returns:
bool: 设置是否成功
"""
if not os.path.exists(image_path):
print(f"目标图像文件不存在: {image_path}")
return False
# 提取人脸特征
faces = self.extract_face_features(image_path)
if not faces:
print(f"在目标图像 {image_path} 中未检测到人脸")
return False
if len(faces) > 1:
print(f"在目标图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
# 存储目标人脸特征
self.target_embedding = np.array(faces[0]['embedding'])
self.target_person_id = person_id
self.target_image_path = image_path
print(f"成功设置目标人脸: {person_id}")
return True
def extract_face_features(self, image_path: str) -> List[Dict]:
"""
从图像中提取所有人脸特征
Args:
image_path: 图像路径
Returns:
List[Dict]: 包含多个人脸信息的列表
"""
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
face_info = {
'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2]
'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...]
'embedding': face.embedding.tolist(), # 特征向量
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score) # 检测置信度
}
results.append(face_info)
return results
def compare_with_target(self, query_embedding: np.ndarray) -> float:
"""
与目标人脸进行比对
Args:
query_embedding: 查询特征向量
Returns:
float: 相似度 (0-1)
"""
if self.target_embedding is None:
raise ValueError("未设置目标人脸")
# 归一化向量
query_embedding = query_embedding / np.linalg.norm(query_embedding)
target_embedding = self.target_embedding / np.linalg.norm(self.target_embedding)
# 计算余弦相似度
similarity = np.dot(query_embedding, target_embedding)
return float(similarity)
def get_similarity_color(self, similarity: float) -> tuple:
"""
根据相似度获取对应的颜色
Args:
similarity: 相似度 (0-1)
Returns:
tuple: BGR颜色值
"""
if similarity >= self.similarity_threshold:
return self.colors['match_high'] # 高相似度 - 绿色
elif similarity >= self.similarity_threshold/2:
return self.colors['match_low'] # 低相似度 - 橙色
else:
return self.colors['no_match'] # 不匹配 - 红色
def process_image_with_target_comparison(self, image_path: str, output_path: str = None,
draw_landmarks: bool = True, draw_info: bool = True) -> Dict:
"""
处理图像并与目标人脸进行比对
Args:
image_path: 输入图像路径
output_path: 输出图像路径
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
Returns:
Dict: 处理结果
"""
if self.target_embedding is None:
raise ValueError("请先设置目标人脸")
# 读取图像
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
# 提取人脸特征
faces = self.extract_face_features(image_path)
comparison_results = {
'image_path': image_path,
'target_person_id': self.target_person_id,
'faces_detected': len(faces),
'comparisons': []
}
# 对每个检测到的人脸进行比对
for i, face in enumerate(faces):
# 与目标人脸比对
query_embedding = np.array(face['embedding'])
similarity = self.compare_with_target(query_embedding)
# 存储比对结果
face_comparison = {
'face_index': i,
'bbox': face['bbox'],
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': face['gender'],
'age': face['age'],
'det_score': face['det_score']
}
comparison_results['comparisons'].append(face_comparison)
# 在图像上绘制标注
self._draw_face_comparison_annotation(img, face_comparison, draw_landmarks, draw_info)
# 保存输出图像
if output_path:
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
cv2.imwrite(output_path, img)
print(f"比对结果图像已保存: {output_path}")
return comparison_results
def _draw_face_comparison_annotation(self, img: np.ndarray, face_info: Dict,
draw_landmarks: bool, draw_info: bool):
"""
在图像上绘制人脸比对标注
Args:
img: 图像数组
face_info: 人脸比对信息
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
"""
bbox = face_info['bbox'] # [x1, y1, x2, y2]
similarity = face_info['similarity']
is_match = face_info['is_match']
# 根据相似度确定颜色
color = self.get_similarity_color(similarity)
# 绘制人脸框
thickness = 3 if is_match else 2
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, thickness)
# 绘制关键点
if draw_landmarks and 'kps' in face_info:
for kp in face_info['kps']:
cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1)
# 绘制信息文本
if draw_info:
# 相似度信息
text_y = bbox[1] - 10
if text_y < 20:
text_y = bbox[3] + 20
similarity_text = f"Similarity: {similarity:.3f}"
match_status = "MATCH" if is_match else "NO MATCH"
cv2.putText(img, similarity_text, (bbox[0], text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2)
# 匹配状态
cv2.putText(img, match_status, (bbox[0], text_y + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# 人脸属性信息
detail_text = f"{face_info['gender']}/{face_info['age']}"
cv2.putText(img, detail_text, (bbox[0], text_y + 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1)
def batch_process_with_target(self, input_dir: str, output_dir: str,
image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict:
"""
批量处理目录中的所有图像,与目标人脸进行比对
Args:
input_dir: 输入图像目录
output_dir: 输出图像目录
image_extensions: 图像文件扩展名
Returns:
Dict: 批量处理结果统计
"""
if self.target_embedding is None:
raise ValueError("请先设置目标人脸")
if not os.path.exists(input_dir):
raise ValueError(f"输入目录不存在: {input_dir}")
os.makedirs(output_dir, exist_ok=True)
# 获取所有图像文件
image_files = []
for ext in image_extensions:
image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)])
print(f"找到 {len(image_files)} 个图像文件")
# 处理统计
stats = {
'target_person_id': self.target_person_id,
'total_images': len(image_files),
'processed_images': 0,
'total_faces': 0,
'matched_faces': 0,
'max_similarity': 0.0,
'min_similarity': 1.0,
'avg_similarity': 0.0,
'results': []
}
all_similarities = []
# 批量处理
for image_file in image_files:
input_path = os.path.join(input_dir, image_file)
output_path = os.path.join(output_dir, f"compared_{image_file}")
try:
# 处理单张图像
result = self.process_image_with_target_comparison(input_path, output_path)
# 更新统计
stats['processed_images'] += 1
stats['total_faces'] += result['faces_detected']
image_similarities = [comp['similarity'] for comp in result['comparisons']]
all_similarities.extend(image_similarities)
if image_similarities:
stats['max_similarity'] = max(stats['max_similarity'], max(image_similarities))
stats['min_similarity'] = min(stats['min_similarity'], min(image_similarities))
matched_count = sum(1 for comp in result['comparisons'] if comp['is_match'])
stats['matched_faces'] += matched_count
stats['results'].append({
'image_file': image_file,
'faces_detected': result['faces_detected'],
'matched_faces': matched_count,
'max_similarity': max(image_similarities) if image_similarities else 0,
'min_similarity': min(image_similarities) if image_similarities else 0
})
print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, "
f"匹配 {matched_count} 张, 最高相似度: {max(image_similarities) if image_similarities else 0:.3f}")
except Exception as e:
print(f"处理图像 {image_file} 时出错: {e}")
# 计算平均相似度
if all_similarities:
stats['avg_similarity'] = sum(all_similarities) / len(all_similarities)
# 保存处理统计
stats_path = os.path.join(output_dir, "comparison_stats.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"批量比对完成! 统计信息已保存: {stats_path}")
return stats
def set_similarity_threshold(self, threshold: float):
"""设置相似度阈值"""
if 0 <= threshold <= 1:
self.similarity_threshold = threshold
print(f"相似度阈值已设置为: {threshold}")
else:
print("阈值必须在 0 到 1 之间")
def get_target_info(self) -> Dict:
"""获取目标人脸信息"""
if self.target_embedding is None:
return {"status": "未设置目标人脸"}
return {
"status": "已设置",
"person_id": self.target_person_id,
"image_path": self.target_image_path,
"similarity_threshold": self.similarity_threshold
}
# 使用示例和演示
def demo_usage():
"""演示使用方法"""
# 创建人脸比对系统
face_system = SingleFaceComparisonSystem()
# 1. 设置目标人脸
print("=== 设置目标人脸 ===")
# 创建测试数据目录
os.makedirs("test_data/target", exist_ok=True)
os.makedirs("test_data/query", exist_ok=True)
os.makedirs("test_data/output", exist_ok=True)
target_image = "test_data/register/person2.jpg"
# target_image = "test_data/register/person1.png"
# 设置目标人脸
if os.path.exists(target_image):
face_system.set_target_face(target_image, "target_person")
else:
print(f"目标图像不存在: {target_image}")
print("请将目标人脸图像放在 test_data/target/ 目录下")
return
# 2. 查看目标信息
print("\n=== 目标人脸信息 ===")
target_info = face_system.get_target_info()
print(f"目标人员ID: {target_info['person_id']}")
print(f"目标图像路径: {target_info['image_path']}")
print(f"相似度阈值: {target_info['similarity_threshold']}")
# 3. 单张图像比对示例
print("\n=== 单张图像比对 ===")
query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像
if os.path.exists(query_image):
result = face_system.process_image_with_target_comparison(
image_path=query_image,
output_path="test_data/output/compared_multi_face.jpg",
draw_landmarks=True,
draw_info=True
)
print(f"检测到 {result['faces_detected']} 张人脸:")
for face in result['comparisons']:
status = "匹配" if face['is_match'] else "不匹配"
color_name = "绿色" if face['similarity'] >= 0.8 else \
"黄色" if face['similarity'] >= 0.6 else \
"橙色" if face['similarity'] >= 0.4 else "红色"
print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} "
f"({color_name}, {status}), {face['gender']}/{face['age']}")
else:
print(f"查询图像不存在: {query_image}")
print("请将包含多人脸的图像放在 test_data/query/ 目录下")
# 4. 批量处理示例
print("\n=== 批量比对图像 ===")
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
stats = face_system.batch_process_with_target(
input_dir="test_data/query",
output_dir="test_data/output/batch_comparison"
)
print(f"批量比对统计:")
print(f" 目标人员: {stats['target_person_id']}")
print(f" 总图像数: {stats['total_images']}")
print(f" 成功处理: {stats['processed_images']}")
print(f" 总人脸数: {stats['total_faces']}")
print(f" 匹配人脸: {stats['matched_faces']}")
print(f" 最高相似度: {stats['max_similarity']:.3f}")
print(f" 最低相似度: {stats['min_similarity']:.3f}")
print(f" 平均相似度: {stats['avg_similarity']:.3f}")
# 5. 调整相似度阈值
print("\n=== 调整阈值 ===")
face_system.set_similarity_threshold(0.25) # 提高阈值,更严格
# 高级使用示例
def advanced_usage():
"""高级使用示例"""
face_system = SingleFaceComparisonSystem()
# 设置目标人脸
face_system.set_target_face("path/to/target_face.jpg", "suspect_A")
# 设置更严格的阈值
face_system.set_similarity_threshold(0.75)
# 处理单个图像并保存结果
result = face_system.process_image_with_target_comparison(
image_path="path/to/surveillance_image.jpg",
output_path="path/to/analyzed_image.jpg"
)
# 输出详细信息
print(f"分析完成: {result['image_path']}")
print(f"目标人员: {result['target_person_id']}")
print(f"检测到人脸: {result['faces_detected']}")
for comp in result['comparisons']:
print(f"人脸 {comp['face_index'] + 1}: 相似度={comp['similarity']:.3f}, "
f"匹配={comp['is_match']}, 位置={comp['bbox']}")
if __name__ == "__main__":
demo_usage()