import os import cv2 import numpy as np from insightface.app import FaceAnalysis import pickle from typing import List, Tuple, Dict, Optional from datetime import datetime import json class MultiFaceRecognitionSystem: """ 多人脸识别系统 支持多个人脸检测、识别和标注 """ def __init__(self, model_name: str = 'buffalo_l'): """ 初始化人脸识别系统 Args: model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等 """ # 初始化InsightFace self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) self.app.prepare(ctx_id=0, det_size=(640, 640)) # 人脸数据库 self.face_database = {} self.database_file = "face_database.pkl" # 加载已有数据库 self.load_database() # 相似度阈值 self.similarity_threshold = 0.6 # 颜色配置 self.colors = { 'known': (0, 255, 0), # 绿色 - 已知人脸 'unknown': (0, 0, 255), # 红色 - 未知人脸 'text': (255, 255, 255), # 白色 - 文字 'landmark': (255, 0, 0) # 蓝色 - 关键点 } def register_face(self, image_path: str, person_id: str) -> bool: """ 注册单个人脸到数据库 Args: image_path: 图像路径 person_id: 人员ID Returns: bool: 注册是否成功 """ if not os.path.exists(image_path): print(f"图像文件不存在: {image_path}") return False # 提取人脸特征 faces = self.extract_face_features(image_path) if not faces: print(f"在图像 {image_path} 中未检测到人脸") return False if len(faces) > 1: print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸") # 存储到数据库 self.face_database[person_id] = { 'embedding': np.array(faces[0]['embedding']), 'image_path': image_path, 'registration_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # 保存数据库 self.save_database() print(f"成功注册人脸: {person_id}") return True def batch_register_faces(self, image_dir: str, id_prefix: str = "person") -> int: """ 批量注册人脸 Args: image_dir: 图像目录 id_prefix: ID前缀 Returns: int: 成功注册的数量 """ if not os.path.exists(image_dir): print(f"目录不存在: {image_dir}") return 0 registered_count = 0 image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] for i, filename in enumerate(image_files): image_path = os.path.join(image_dir, filename) person_id = f"{id_prefix}_{i + 1:03d}" if self.register_face(image_path, person_id): registered_count += 1 print(f"批量注册完成: {registered_count}/{len(image_files)} 个人脸") return registered_count def extract_face_features(self, image_path: str) -> List[Dict]: """ 从图像中提取所有人脸特征 Args: image_path: 图像路径 Returns: List[Dict]: 包含多个人脸信息的列表 """ img = cv2.imread(image_path) if img is None: raise ValueError(f"无法读取图像: {image_path}") faces = self.app.get(img) results = [] for i, face in enumerate(faces): face_info = { 'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2] 'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...] 'embedding': face.embedding.tolist(), # 特征向量 'gender': 'Male' if face.gender == 1 else 'Female', 'age': int(face.age), 'det_score': float(face.det_score) # 检测置信度 } results.append(face_info) return results def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: """ 计算两个人脸特征的相似度 Args: embedding1: 特征向量1 embedding2: 特征向量2 Returns: float: 余弦相似度 (0-1) """ # 归一化向量 embedding1 = embedding1 / np.linalg.norm(embedding1) embedding2 = embedding2 / np.linalg.norm(embedding2) # 计算余弦相似度 similarity = np.dot(embedding1, embedding2) return float(similarity) def search_face(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]: """ 在数据库中搜索最相似的人脸 Args: query_embedding: 查询特征向量 top_k: 返回最相似的k个结果 Returns: List[Tuple[str, float]]: (人员ID, 相似度) """ if not self.face_database: return [] results = [] query_embedding = query_embedding / np.linalg.norm(query_embedding) for person_id, data in self.face_database.items(): db_embedding = data['embedding'] / np.linalg.norm(data['embedding']) similarity = np.dot(query_embedding, db_embedding) if similarity > self.similarity_threshold: results.append((person_id, float(similarity))) # 按相似度降序排序 results.sort(key=lambda x: x[1], reverse=True) # 返回top_k个结果 return results[:top_k] def recognize_faces_in_image(self, image_path: str, output_path: str = None, draw_landmarks: bool = True, draw_info: bool = True) -> Dict: """ 识别图像中的所有脸并进行标注 Args: image_path: 输入图像路径 output_path: 输出图像路径 draw_landmarks: 是否绘制关键点 draw_info: 是否绘制详细信息 Returns: Dict: 识别结果 """ # 读取图像 img = cv2.imread(image_path) if img is None: raise ValueError(f"无法读取图像: {image_path}") # 提取人脸特征 faces = self.extract_face_features(image_path) recognition_results = { 'image_path': image_path, 'faces_detected': len(faces), 'recognitions': [] } # 对每个检测到的人脸进行识别 for i, face in enumerate(faces): # 在数据库中搜索 query_embedding = np.array(face['embedding']) search_results = self.search_face(query_embedding, top_k=1) # 识别结果 person_id = "Unknown" similarity = 0.0 if search_results: person_id, similarity = search_results[0] # 存储识别结果 face_recognition = { 'face_index': i, 'bbox': face['bbox'], 'person_id': person_id, 'similarity': similarity, 'gender': face['gender'], 'age': face['age'], 'det_score': face['det_score'] } recognition_results['recognitions'].append(face_recognition) # 在图像上绘制标注 self._draw_face_annotation(img, face_recognition, draw_landmarks, draw_info) # 保存输出图像 if output_path: os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) cv2.imwrite(output_path, img) print(f"标注图像已保存: {output_path}") return recognition_results def _draw_face_annotation(self, img: np.ndarray, face_info: Dict, draw_landmarks: bool, draw_info: bool): """ 在图像上绘制人脸标注 Args: img: 图像数组 face_info: 人脸信息 draw_landmarks: 是否绘制关键点 draw_info: 是否绘制详细信息 """ bbox = face_info['bbox'] # [x1, y1, x2, y2] person_id = face_info['person_id'] similarity = face_info['similarity'] # 确定颜色:已知人脸用绿色,未知用红色 if person_id != "Unknown": color = self.colors['known'] label = f"{person_id} ({similarity:.3f})" else: color = self.colors['unknown'] label = "Unknown" # 绘制人脸框 cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) # 绘制关键点 if draw_landmarks and 'kps' in face_info: for kp in face_info['kps']: cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1) # 绘制信息文本 if draw_info: # 基础信息 text_y = bbox[1] - 10 if text_y < 20: text_y = bbox[3] + 20 cv2.putText(img, label, (bbox[0], text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2) # 详细信息 detail_text = f"{face_info['gender']}/{face_info['age']}" cv2.putText(img, detail_text, (bbox[0], text_y + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1) def batch_process_images(self, input_dir: str, output_dir: str, image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict: """ 批量处理目录中的所有图像 Args: input_dir: 输入图像目录 output_dir: 输出图像目录 image_extensions: 图像文件扩展名 Returns: Dict: 批量处理结果统计 """ if not os.path.exists(input_dir): raise ValueError(f"输入目录不存在: {input_dir}") os.makedirs(output_dir, exist_ok=True) # 获取所有图像文件 image_files = [] for ext in image_extensions: image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)]) print(f"找到 {len(image_files)} 个图像文件") # 处理统计 stats = { 'total_images': len(image_files), 'processed_images': 0, 'total_faces': 0, 'recognized_faces': 0, 'results': [] } # 批量处理 for image_file in image_files: input_path = os.path.join(input_dir, image_file) output_path = os.path.join(output_dir, f"annotated_{image_file}") try: # 处理单张图像 result = self.recognize_faces_in_image(input_path, output_path) # 更新统计 stats['processed_images'] += 1 stats['total_faces'] += result['faces_detected'] recognized = sum(1 for face in result['recognitions'] if face['person_id'] != "Unknown") stats['recognized_faces'] += recognized stats['results'].append({ 'image_file': image_file, 'faces_detected': result['faces_detected'], 'recognized_faces': recognized }) print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, 识别出 {recognized} 张") except Exception as e: print(f"处理图像 {image_file} 时出错: {e}") # 保存处理统计 stats_path = os.path.join(output_dir, "processing_stats.json") with open(stats_path, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2, ensure_ascii=False) print(f"批量处理完成! 统计信息已保存: {stats_path}") return stats def save_database(self): """保存人脸数据库到文件""" # 转换为可序列化的格式 save_data = {} for person_id, data in self.face_database.items(): save_data[person_id] = { 'embedding': data['embedding'].tolist(), 'image_path': data['image_path'], 'registration_time': data.get('registration_time', 'Unknown') } with open(self.database_file, 'wb') as f: pickle.dump(save_data, f) print(f"人脸数据库已保存: {self.database_file} (共 {len(self.face_database)} 人)") def load_database(self): """从文件加载人脸数据库""" if os.path.exists(self.database_file): try: with open(self.database_file, 'rb') as f: save_data = pickle.load(f) # 转换回numpy数组 for person_id, data in save_data.items(): self.face_database[person_id] = { 'embedding': np.array(data['embedding']), 'image_path': data['image_path'], 'registration_time': data.get('registration_time', 'Unknown') } print(f"人脸数据库已加载: {len(self.face_database)} 人") except Exception as e: print(f"加载数据库失败: {e}") self.face_database = {} else: print("数据库文件不存在,将创建新数据库") self.face_database = {} def get_database_info(self) -> Dict: """获取数据库信息""" return { 'total_persons': len(self.face_database), 'person_ids': list(self.face_database.keys()), 'database_file': self.database_file } def set_similarity_threshold(self, threshold: float): """设置相似度阈值""" if 0 <= threshold <= 1: self.similarity_threshold = threshold print(f"相似度阈值已设置为: {threshold}") else: print("阈值必须在 0 到 1 之间") # 使用示例和演示 def demo_usage(): """演示使用方法""" # 创建人脸识别系统 face_system = MultiFaceRecognitionSystem() # 1. 注册人脸到数据库 print("=== 注册人脸 ===") # 创建测试数据目录 os.makedirs("test_data/register", exist_ok=True) os.makedirs("test_data/query", exist_ok=True) os.makedirs("test_data/output", exist_ok=True) # 假设你有一些人脸图像用于注册 # face_system.register_face("test_data/register/person1.jpg", "alice") # face_system.register_face("test_data/register/person2.jpg", "bob") # face_system.register_face("test_data/register/person3.jpg", "charlie") # # 或者批量注册 # face_system.batch_register_faces("test_data/register", "person") # 2. 查看数据库信息 print("\n=== 数据库信息 ===") db_info = face_system.get_database_info() print(f"数据库人数: {db_info['total_persons']}") print(f"人员ID: {db_info['person_ids']}") # 3. 单张图像识别示例 print("\n=== 单张图像识别 ===") query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像 if os.path.exists(query_image): result = face_system.recognize_faces_in_image( image_path=query_image, output_path="test_data/output/annotated_multi_face.jpg", draw_landmarks=True, draw_info=True ) print(f"检测到 {result['faces_detected']} 张人脸:") for face in result['recognitions']: status = "已知" if face['person_id'] != "Unknown" else "未知" print(f" 人脸 {face['face_index'] + 1}: {face['person_id']} " f"(相似度: {face['similarity']:.3f}, {face['gender']}/{face['age']}岁) - {status}") else: print(f"查询图像不存在: {query_image}") print("请将包含多人脸的图像放在 test_data/query/ 目录下") # 4. 批量处理示例 print("\n=== 批量处理图像 ===") if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0: stats = face_system.batch_process_images( input_dir="test_data/query", output_dir="test_data/output/batch_results" ) print(f"批量处理统计:") print(f" 总图像数: {stats['total_images']}") print(f" 成功处理: {stats['processed_images']}") print(f" 总人脸数: {stats['total_faces']}") print(f" 识别出的人脸: {stats['recognized_faces']}") # 5. 调整相似度阈值 print("\n=== 调整阈值 ===") face_system.set_similarity_threshold(0.1) # 降低阈值,更宽松 # face_system.set_similarity_threshold(0.7) # 提高阈值,更严格 if __name__ == "__main__": demo_usage()