import os import cv2 import numpy as np from insightface.app import FaceAnalysis import pickle from typing import List, Tuple, Dict, Optional from datetime import datetime import json class SingleFaceComparisonSystem: """ 单人脸比对系统 在图像中检测多个人脸,并与单个目标人脸进行比对 """ def __init__(self, model_name: str = 'buffalo_l'): """ 初始化人脸比对系统 Args: model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等 """ # 初始化InsightFace self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) self.app.prepare(ctx_id=0, det_thresh=0.35, # 降低阈值,检测更多模糊/小脸 det_size=(640, 640)) # 目标人脸特征 self.target_embedding = None self.target_person_id = None self.target_image_path = None # 相似度阈值 self.similarity_threshold = 0.3 # 颜色配置 self.colors = { 'match_high': (0, 255, 0), # 绿色 - 高相似度 'match_medium': (0, 255, 255), # 黄色 - 中等相似度 'match_low': (0, 165, 255), # 橙色 - 低相似度 'no_match': (0, 0, 255), # 红色 - 不匹配 'text': (255, 255, 255), # 白色 - 文字 'landmark': (255, 0, 0) # 蓝色 - 关键点 } def set_target_face(self, image_path: str, person_id: str = "target_person") -> bool: """ 设置目标人脸 Args: image_path: 目标人脸图像路径 person_id: 目标人员ID Returns: bool: 设置是否成功 """ if not os.path.exists(image_path): print(f"目标图像文件不存在: {image_path}") return False # 提取人脸特征 faces = self.extract_face_features(image_path) if not faces: print(f"在目标图像 {image_path} 中未检测到人脸") return False if len(faces) > 1: print(f"在目标图像 {image_path} 中检测到多个人脸,将使用第一个人脸") # 存储目标人脸特征 self.target_embedding = np.array(faces[0]['embedding']) self.target_person_id = person_id self.target_image_path = image_path print(f"成功设置目标人脸: {person_id}") return True def extract_face_features(self, image_path: str) -> List[Dict]: """ 从图像中提取所有人脸特征 Args: image_path: 图像路径 Returns: List[Dict]: 包含多个人脸信息的列表 """ img = cv2.imread(image_path) if img is None: raise ValueError(f"无法读取图像: {image_path}") faces = self.app.get(img) results = [] for i, face in enumerate(faces): face_info = { 'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2] 'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...] 'embedding': face.embedding.tolist(), # 特征向量 'gender': 'Male' if face.gender == 1 else 'Female', 'age': int(face.age), 'det_score': float(face.det_score) # 检测置信度 } results.append(face_info) return results def compare_with_target(self, query_embedding: np.ndarray) -> float: """ 与目标人脸进行比对 Args: query_embedding: 查询特征向量 Returns: float: 相似度 (0-1) """ if self.target_embedding is None: raise ValueError("未设置目标人脸") # 归一化向量 query_embedding = query_embedding / np.linalg.norm(query_embedding) target_embedding = self.target_embedding / np.linalg.norm(self.target_embedding) # 计算余弦相似度 similarity = np.dot(query_embedding, target_embedding) return float(similarity) def get_similarity_color(self, similarity: float) -> tuple: """ 根据相似度获取对应的颜色 Args: similarity: 相似度 (0-1) Returns: tuple: BGR颜色值 """ if similarity >= self.similarity_threshold: return self.colors['match_high'] # 高相似度 - 绿色 # elif similarity >= self.similarity_threshold/2: # return self.colors['match_low'] # 低相似度 - 橙色 else: return self.colors['no_match'] # 不匹配 - 红色 def process_image_with_target_comparison(self, image_path: str, output_path: str = None, draw_landmarks: bool = True, draw_info: bool = True) -> Dict: """ 处理图像并与目标人脸进行比对 Args: image_path: 输入图像路径 output_path: 输出图像路径 draw_landmarks: 是否绘制关键点 draw_info: 是否绘制详细信息 Returns: Dict: 处理结果 """ if self.target_embedding is None: raise ValueError("请先设置目标人脸") # 读取图像 img = cv2.imread(image_path) if img is None: raise ValueError(f"无法读取图像: {image_path}") # 提取人脸特征 faces = self.extract_face_features(image_path) comparison_results = { 'image_path': image_path, 'target_person_id': self.target_person_id, 'faces_detected': len(faces), 'comparisons': [] } # 对每个检测到的人脸进行比对 for i, face in enumerate(faces): # 与目标人脸比对 query_embedding = np.array(face['embedding']) similarity = self.compare_with_target(query_embedding) # 存储比对结果 face_comparison = { 'face_index': i, 'bbox': face['bbox'], 'similarity': similarity, 'is_match': similarity >= self.similarity_threshold, 'gender': face['gender'], 'age': face['age'], 'det_score': face['det_score'] } comparison_results['comparisons'].append(face_comparison) # 在图像上绘制标注 self._draw_face_comparison_annotation(img, face_comparison, draw_landmarks, draw_info) # 保存输出图像 if output_path: os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) cv2.imwrite(output_path, img) print(f"比对结果图像已保存: {output_path}") return comparison_results def _draw_face_comparison_annotation(self, img: np.ndarray, face_info: Dict, draw_landmarks: bool, draw_info: bool): """ 在图像上绘制人脸比对标注 Args: img: 图像数组 face_info: 人脸比对信息 draw_landmarks: 是否绘制关键点 draw_info: 是否绘制详细信息 """ bbox = face_info['bbox'] # [x1, y1, x2, y2] similarity = face_info['similarity'] is_match = face_info['is_match'] # 根据相似度确定颜色 color = self.get_similarity_color(similarity) # 绘制人脸框 thickness = 3 if is_match else 2 cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, thickness) # 绘制关键点 if draw_landmarks and 'kps' in face_info: for kp in face_info['kps']: cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1) # 绘制信息文本 if draw_info: # 相似度信息 text_y = bbox[1] - 10 if text_y < 20: text_y = bbox[3] + 20 similarity_text = f"Similarity: {similarity:.3f}" match_status = "MATCH" if is_match else "NO MATCH" cv2.putText(img, similarity_text, (bbox[0], text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2) # 匹配状态 cv2.putText(img, match_status, (bbox[0], text_y + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # 人脸属性信息 detail_text = f"{face_info['gender']}/{face_info['age']}" cv2.putText(img, detail_text, (bbox[0], text_y + 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1) def batch_process_with_target(self, input_dir: str, output_dir: str, image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict: """ 批量处理目录中的所有图像,与目标人脸进行比对 Args: input_dir: 输入图像目录 output_dir: 输出图像目录 image_extensions: 图像文件扩展名 Returns: Dict: 批量处理结果统计 """ if self.target_embedding is None: raise ValueError("请先设置目标人脸") if not os.path.exists(input_dir): raise ValueError(f"输入目录不存在: {input_dir}") os.makedirs(output_dir, exist_ok=True) # 获取所有图像文件 image_files = [] for ext in image_extensions: image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)]) print(f"找到 {len(image_files)} 个图像文件") # 处理统计 stats = { 'target_person_id': self.target_person_id, 'total_images': len(image_files), 'processed_images': 0, 'total_faces': 0, 'matched_faces': 0, 'max_similarity': 0.0, 'min_similarity': 1.0, 'avg_similarity': 0.0, 'results': [] } all_similarities = [] # 批量处理 for image_file in image_files: input_path = os.path.join(input_dir, image_file) output_path = os.path.join(output_dir, f"compared_{image_file}") try: # 处理单张图像 result = self.process_image_with_target_comparison(input_path, output_path) # 更新统计 stats['processed_images'] += 1 stats['total_faces'] += result['faces_detected'] image_similarities = [comp['similarity'] for comp in result['comparisons']] all_similarities.extend(image_similarities) if image_similarities: stats['max_similarity'] = max(stats['max_similarity'], max(image_similarities)) stats['min_similarity'] = min(stats['min_similarity'], min(image_similarities)) matched_count = sum(1 for comp in result['comparisons'] if comp['is_match']) stats['matched_faces'] += matched_count stats['results'].append({ 'image_file': image_file, 'faces_detected': result['faces_detected'], 'matched_faces': matched_count, 'max_similarity': max(image_similarities) if image_similarities else 0, 'min_similarity': min(image_similarities) if image_similarities else 0 }) print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, " f"匹配 {matched_count} 张, 最高相似度: {max(image_similarities) if image_similarities else 0:.3f}") except Exception as e: print(f"处理图像 {image_file} 时出错: {e}") # 计算平均相似度 if all_similarities: stats['avg_similarity'] = sum(all_similarities) / len(all_similarities) # 保存处理统计 stats_path = os.path.join(output_dir, "comparison_stats.json") with open(stats_path, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2, ensure_ascii=False) print(f"批量比对完成! 统计信息已保存: {stats_path}") return stats def set_similarity_threshold(self, threshold: float): """设置相似度阈值""" if 0 <= threshold <= 1: self.similarity_threshold = threshold print(f"相似度阈值已设置为: {threshold}") else: print("阈值必须在 0 到 1 之间") def get_target_info(self) -> Dict: """获取目标人脸信息""" if self.target_embedding is None: return {"status": "未设置目标人脸"} return { "status": "已设置", "person_id": self.target_person_id, "image_path": self.target_image_path, "similarity_threshold": self.similarity_threshold } # 使用示例和演示 def demo_usage(): """演示使用方法""" # 创建人脸比对系统 face_system = SingleFaceComparisonSystem() # 1. 设置目标人脸 print("=== 设置目标人脸 ===") # 创建测试数据目录 os.makedirs("test_data/target", exist_ok=True) os.makedirs("test_data/query", exist_ok=True) os.makedirs("test_data/output", exist_ok=True) # target_image = "test_data/register/person2.jpg" # target_image = "test_data/register/person1.png" target_image = "test_data/register/ztk.jpg" # 设置目标人脸 if os.path.exists(target_image): face_system.set_target_face(target_image, "target_person") else: print(f"目标图像不存在: {target_image}") print("请将目标人脸图像放在 test_data/target/ 目录下") return # 2. 查看目标信息 print("\n=== 目标人脸信息 ===") target_info = face_system.get_target_info() print(f"目标人员ID: {target_info['person_id']}") print(f"目标图像路径: {target_info['image_path']}") print(f"相似度阈值: {target_info['similarity_threshold']}") # 3. 单张图像比对示例 print("\n=== 单张图像比对 ===") query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像 if os.path.exists(query_image): result = face_system.process_image_with_target_comparison( image_path=query_image, output_path="test_data/output/compared_multi_face.jpg", draw_landmarks=True, draw_info=True ) print(f"检测到 {result['faces_detected']} 张人脸:") for face in result['comparisons']: status = "匹配" if face['is_match'] else "不匹配" color_name = "绿色" if face['similarity'] >= 0.8 else \ "黄色" if face['similarity'] >= 0.6 else \ "橙色" if face['similarity'] >= 0.4 else "红色" print(f" 人脸 {face['face_index'] + 1}: 相似度 {face['similarity']:.3f} " f"({color_name}, {status}), {face['gender']}/{face['age']}岁") else: print(f"查询图像不存在: {query_image}") print("请将包含多人脸的图像放在 test_data/query/ 目录下") # 4. 批量处理示例 print("\n=== 批量比对图像 ===") if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0: stats = face_system.batch_process_with_target( input_dir="test_data/query/2", output_dir="test_data/output/batch_comparison/2" ) print(f"批量比对统计:") print(f" 目标人员: {stats['target_person_id']}") print(f" 总图像数: {stats['total_images']}") print(f" 成功处理: {stats['processed_images']}") print(f" 总人脸数: {stats['total_faces']}") print(f" 匹配人脸: {stats['matched_faces']}") print(f" 最高相似度: {stats['max_similarity']:.3f}") print(f" 最低相似度: {stats['min_similarity']:.3f}") print(f" 平均相似度: {stats['avg_similarity']:.3f}") # 5. 调整相似度阈值 print("\n=== 调整阈值 ===") face_system.set_similarity_threshold(0.3) # 提高阈值,更严格 # 高级使用示例 def advanced_usage(): """高级使用示例""" face_system = SingleFaceComparisonSystem() # 设置目标人脸 face_system.set_target_face("path/to/target_face.jpg", "suspect_A") # 设置更严格的阈值 face_system.set_similarity_threshold(0.75) # 处理单个图像并保存结果 result = face_system.process_image_with_target_comparison( image_path="path/to/surveillance_image.jpg", output_path="path/to/analyzed_image.jpg" ) # 输出详细信息 print(f"分析完成: {result['image_path']}") print(f"目标人员: {result['target_person_id']}") print(f"检测到人脸: {result['faces_detected']}") for comp in result['comparisons']: print(f"人脸 {comp['face_index'] + 1}: 相似度={comp['similarity']:.3f}, " f"匹配={comp['is_match']}, 位置={comp['bbox']}") if __name__ == "__main__": demo_usage()