Files
SupervisorAI/backup/face_recognition_system_3.py
2025-12-20 18:07:49 +08:00

482 lines
17 KiB
Python

import os
import cv2
import numpy as np
from insightface.app import FaceAnalysis
import pickle
from typing import List, Tuple, Dict, Optional
from datetime import datetime
import json
class SingleFaceComparisonSystem:
"""
单人脸比对系统
在图像中检测多个人脸,并与单个目标人脸进行比对
"""
def __init__(self, model_name: str = 'buffalo_l'):
"""
初始化人脸比对系统
Args:
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s'
"""
# 初始化InsightFace
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
self.app.prepare(ctx_id=0,
det_thresh=0.35, # 降低阈值,检测更多模糊/小脸
det_size=(640, 640))
# 目标人脸特征
self.target_embedding = None
self.target_person_id = None
self.target_image_path = None
# 相似度阈值
self.similarity_threshold = 0.3
# 颜色配置
self.colors = {
'match_high': (0, 255, 0), # 绿色 - 高相似度
'match_medium': (0, 255, 255), # 黄色 - 中等相似度
'match_low': (0, 165, 255), # 橙色 - 低相似度
'no_match': (0, 0, 255), # 红色 - 不匹配
'text': (255, 255, 255), # 白色 - 文字
'landmark': (255, 0, 0) # 蓝色 - 关键点
}
def set_target_face(self, image_path: str, person_id: str = "target_person") -> bool:
"""
设置目标人脸
Args:
image_path: 目标人脸图像路径
person_id: 目标人员ID
Returns:
bool: 设置是否成功
"""
if not os.path.exists(image_path):
print(f"目标图像文件不存在: {image_path}")
return False
# 提取人脸特征
faces = self.extract_face_features(image_path)
if not faces:
print(f"在目标图像 {image_path} 中未检测到人脸")
return False
if len(faces) > 1:
print(f"在目标图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
# 存储目标人脸特征
self.target_embedding = np.array(faces[0]['embedding'])
self.target_person_id = person_id
self.target_image_path = image_path
print(f"成功设置目标人脸: {person_id}")
return True
def extract_face_features(self, image_path: str) -> List[Dict]:
"""
从图像中提取所有人脸特征
Args:
image_path: 图像路径
Returns:
List[Dict]: 包含多个人脸信息的列表
"""
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
face_info = {
'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2]
'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...]
'embedding': face.embedding.tolist(), # 特征向量
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score) # 检测置信度
}
results.append(face_info)
return results
def compare_with_target(self, query_embedding: np.ndarray) -> float:
"""
与目标人脸进行比对
Args:
query_embedding: 查询特征向量
Returns:
float: 相似度 (0-1)
"""
if self.target_embedding is None:
raise ValueError("未设置目标人脸")
# 归一化向量
query_embedding = query_embedding / np.linalg.norm(query_embedding)
target_embedding = self.target_embedding / np.linalg.norm(self.target_embedding)
# 计算余弦相似度
similarity = np.dot(query_embedding, target_embedding)
return float(similarity)
def get_similarity_color(self, similarity: float) -> tuple:
"""
根据相似度获取对应的颜色
Args:
similarity: 相似度 (0-1)
Returns:
tuple: BGR颜色值
"""
if similarity >= self.similarity_threshold:
return self.colors['match_high'] # 高相似度 - 绿色
# elif similarity >= self.similarity_threshold/2:
# return self.colors['match_low'] # 低相似度 - 橙色
else:
return self.colors['no_match'] # 不匹配 - 红色
def process_image_with_target_comparison(self, image_path: str, output_path: str = None,
draw_landmarks: bool = True, draw_info: bool = True) -> Dict:
"""
处理图像并与目标人脸进行比对
Args:
image_path: 输入图像路径
output_path: 输出图像路径
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
Returns:
Dict: 处理结果
"""
if self.target_embedding is None:
raise ValueError("请先设置目标人脸")
# 读取图像
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
# 提取人脸特征
faces = self.extract_face_features(image_path)
comparison_results = {
'image_path': image_path,
'target_person_id': self.target_person_id,
'faces_detected': len(faces),
'comparisons': []
}
# 对每个检测到的人脸进行比对
for i, face in enumerate(faces):
# 与目标人脸比对
query_embedding = np.array(face['embedding'])
similarity = self.compare_with_target(query_embedding)
# 存储比对结果
face_comparison = {
'face_index': i,
'bbox': face['bbox'],
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': face['gender'],
'age': face['age'],
'det_score': face['det_score']
}
comparison_results['comparisons'].append(face_comparison)
# 在图像上绘制标注
self._draw_face_comparison_annotation(img, face_comparison, draw_landmarks, draw_info)
# 保存输出图像
if output_path:
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
cv2.imwrite(output_path, img)
print(f"比对结果图像已保存: {output_path}")
return comparison_results
def _draw_face_comparison_annotation(self, img: np.ndarray, face_info: Dict,
draw_landmarks: bool, draw_info: bool):
"""
在图像上绘制人脸比对标注
Args:
img: 图像数组
face_info: 人脸比对信息
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
"""
bbox = face_info['bbox'] # [x1, y1, x2, y2]
similarity = face_info['similarity']
is_match = face_info['is_match']
# 根据相似度确定颜色
color = self.get_similarity_color(similarity)
# 绘制人脸框
thickness = 3 if is_match else 2
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, thickness)
# 绘制关键点
if draw_landmarks and 'kps' in face_info:
for kp in face_info['kps']:
cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1)
# 绘制信息文本
if draw_info:
# 相似度信息
text_y = bbox[1] - 10
if text_y < 20:
text_y = bbox[3] + 20
similarity_text = f"Similarity: {similarity:.3f}"
match_status = "MATCH" if is_match else "NO MATCH"
cv2.putText(img, similarity_text, (bbox[0], text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2)
# 匹配状态
cv2.putText(img, match_status, (bbox[0], text_y + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# 人脸属性信息
detail_text = f"{face_info['gender']}/{face_info['age']}"
cv2.putText(img, detail_text, (bbox[0], text_y + 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1)
def batch_process_with_target(self, input_dir: str, output_dir: str,
image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict:
"""
批量处理目录中的所有图像,与目标人脸进行比对
Args:
input_dir: 输入图像目录
output_dir: 输出图像目录
image_extensions: 图像文件扩展名
Returns:
Dict: 批量处理结果统计
"""
if self.target_embedding is None:
raise ValueError("请先设置目标人脸")
if not os.path.exists(input_dir):
raise ValueError(f"输入目录不存在: {input_dir}")
os.makedirs(output_dir, exist_ok=True)
# 获取所有图像文件
image_files = []
for ext in image_extensions:
image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)])
print(f"找到 {len(image_files)} 个图像文件")
# 处理统计
stats = {
'target_person_id': self.target_person_id,
'total_images': len(image_files),
'processed_images': 0,
'total_faces': 0,
'matched_faces': 0,
'max_similarity': 0.0,
'min_similarity': 1.0,
'avg_similarity': 0.0,
'results': []
}
all_similarities = []
# 批量处理
for image_file in image_files:
input_path = os.path.join(input_dir, image_file)
output_path = os.path.join(output_dir, f"compared_{image_file}")
try:
# 处理单张图像
result = self.process_image_with_target_comparison(input_path, output_path)
# 更新统计
stats['processed_images'] += 1
stats['total_faces'] += result['faces_detected']
image_similarities = [comp['similarity'] for comp in result['comparisons']]
all_similarities.extend(image_similarities)
if image_similarities:
stats['max_similarity'] = max(stats['max_similarity'], max(image_similarities))
stats['min_similarity'] = min(stats['min_similarity'], min(image_similarities))
matched_count = sum(1 for comp in result['comparisons'] if comp['is_match'])
stats['matched_faces'] += matched_count
stats['results'].append({
'image_file': image_file,
'faces_detected': result['faces_detected'],
'matched_faces': matched_count,
'max_similarity': max(image_similarities) if image_similarities else 0,
'min_similarity': min(image_similarities) if image_similarities else 0
})
print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, "
f"匹配 {matched_count} 张, 最高相似度: {max(image_similarities) if image_similarities else 0:.3f}")
except Exception as e:
print(f"处理图像 {image_file} 时出错: {e}")
# 计算平均相似度
if all_similarities:
stats['avg_similarity'] = sum(all_similarities) / len(all_similarities)
# 保存处理统计
stats_path = os.path.join(output_dir, "comparison_stats.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"批量比对完成! 统计信息已保存: {stats_path}")
return stats
def set_similarity_threshold(self, threshold: float):
"""设置相似度阈值"""
if 0 <= threshold <= 1:
self.similarity_threshold = threshold
print(f"相似度阈值已设置为: {threshold}")
else:
print("阈值必须在 0 到 1 之间")
def get_target_info(self) -> Dict:
"""获取目标人脸信息"""
if self.target_embedding is None:
return {"status": "未设置目标人脸"}
return {
"status": "已设置",
"person_id": self.target_person_id,
"image_path": self.target_image_path,
"similarity_threshold": self.similarity_threshold
}
# 使用示例和演示
def demo_usage():
"""演示使用方法"""
# 创建人脸比对系统
face_system = SingleFaceComparisonSystem()
# 1. 设置目标人脸
print("=== 设置目标人脸 ===")
# 创建测试数据目录
os.makedirs("test_data/target", exist_ok=True)
os.makedirs("test_data/query", exist_ok=True)
os.makedirs("test_data/output", exist_ok=True)
# target_image = "test_data/register/person2.jpg"
# target_image = "test_data/register/person1.png"
target_image = "test_data/register/ztk.jpg"
# 设置目标人脸
if os.path.exists(target_image):
face_system.set_target_face(target_image, "target_person")
else:
print(f"目标图像不存在: {target_image}")
print("请将目标人脸图像放在 test_data/target/ 目录下")
return
# 2. 查看目标信息
print("\n=== 目标人脸信息 ===")
target_info = face_system.get_target_info()
print(f"目标人员ID: {target_info['person_id']}")
print(f"目标图像路径: {target_info['image_path']}")
print(f"相似度阈值: {target_info['similarity_threshold']}")
# 3. 单张图像比对示例
print("\n=== 单张图像比对 ===")
query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像
if os.path.exists(query_image):
result = face_system.process_image_with_target_comparison(
image_path=query_image,
output_path="test_data/output/compared_multi_face.jpg",
draw_landmarks=True,
draw_info=True
)
print(f"检测到 {result['faces_detected']} 张人脸:")
for face in result['comparisons']:
status = "匹配" if face['is_match'] else "不匹配"
color_name = "绿色" if face['similarity'] >= 0.8 else \
"黄色" if face['similarity'] >= 0.6 else \
"橙色" if face['similarity'] >= 0.4 else "红色"
print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} "
f"({color_name}, {status}), {face['gender']}/{face['age']}")
else:
print(f"查询图像不存在: {query_image}")
print("请将包含多人脸的图像放在 test_data/query/ 目录下")
# 4. 批量处理示例
print("\n=== 批量比对图像 ===")
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
stats = face_system.batch_process_with_target(
input_dir="test_data/query/2",
output_dir="test_data/output/batch_comparison/2"
)
print(f"批量比对统计:")
print(f" 目标人员: {stats['target_person_id']}")
print(f" 总图像数: {stats['total_images']}")
print(f" 成功处理: {stats['processed_images']}")
print(f" 总人脸数: {stats['total_faces']}")
print(f" 匹配人脸: {stats['matched_faces']}")
print(f" 最高相似度: {stats['max_similarity']:.3f}")
print(f" 最低相似度: {stats['min_similarity']:.3f}")
print(f" 平均相似度: {stats['avg_similarity']:.3f}")
# 5. 调整相似度阈值
print("\n=== 调整阈值 ===")
face_system.set_similarity_threshold(0.3) # 提高阈值,更严格
# 高级使用示例
def advanced_usage():
"""高级使用示例"""
face_system = SingleFaceComparisonSystem()
# 设置目标人脸
face_system.set_target_face("path/to/target_face.jpg", "suspect_A")
# 设置更严格的阈值
face_system.set_similarity_threshold(0.75)
# 处理单个图像并保存结果
result = face_system.process_image_with_target_comparison(
image_path="path/to/surveillance_image.jpg",
output_path="path/to/analyzed_image.jpg"
)
# 输出详细信息
print(f"分析完成: {result['image_path']}")
print(f"目标人员: {result['target_person_id']}")
print(f"检测到人脸: {result['faces_detected']}")
for comp in result['comparisons']:
print(f"人脸 {comp['face_index'] + 1}: 相似度={comp['similarity']:.3f}, "
f"匹配={comp['is_match']}, 位置={comp['bbox']}")
if __name__ == "__main__":
demo_usage()