Files
SupervisorAI/backup/face_recognition_system_2.py
2025-12-20 18:07:49 +08:00

496 lines
17 KiB
Python

import os
import cv2
import numpy as np
from insightface.app import FaceAnalysis
import pickle
from typing import List, Tuple, Dict, Optional
from datetime import datetime
import json
class MultiFaceRecognitionSystem:
"""
多人脸识别系统
支持多个人脸检测、识别和标注
"""
def __init__(self, model_name: str = 'buffalo_l'):
"""
初始化人脸识别系统
Args:
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s'
"""
# 初始化InsightFace
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
self.app.prepare(ctx_id=0, det_size=(640, 640))
# 人脸数据库
self.face_database = {}
self.database_file = "face_database.pkl"
# 加载已有数据库
self.load_database()
# 相似度阈值
self.similarity_threshold = 0.6
# 颜色配置
self.colors = {
'known': (0, 255, 0), # 绿色 - 已知人脸
'unknown': (0, 0, 255), # 红色 - 未知人脸
'text': (255, 255, 255), # 白色 - 文字
'landmark': (255, 0, 0) # 蓝色 - 关键点
}
def register_face(self, image_path: str, person_id: str) -> bool:
"""
注册单个人脸到数据库
Args:
image_path: 图像路径
person_id: 人员ID
Returns:
bool: 注册是否成功
"""
if not os.path.exists(image_path):
print(f"图像文件不存在: {image_path}")
return False
# 提取人脸特征
faces = self.extract_face_features(image_path)
if not faces:
print(f"在图像 {image_path} 中未检测到人脸")
return False
if len(faces) > 1:
print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
# 存储到数据库
self.face_database[person_id] = {
'embedding': np.array(faces[0]['embedding']),
'image_path': image_path,
'registration_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 保存数据库
self.save_database()
print(f"成功注册人脸: {person_id}")
return True
def batch_register_faces(self, image_dir: str, id_prefix: str = "person") -> int:
"""
批量注册人脸
Args:
image_dir: 图像目录
id_prefix: ID前缀
Returns:
int: 成功注册的数量
"""
if not os.path.exists(image_dir):
print(f"目录不存在: {image_dir}")
return 0
registered_count = 0
image_files = [f for f in os.listdir(image_dir)
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
for i, filename in enumerate(image_files):
image_path = os.path.join(image_dir, filename)
person_id = f"{id_prefix}_{i + 1:03d}"
if self.register_face(image_path, person_id):
registered_count += 1
print(f"批量注册完成: {registered_count}/{len(image_files)} 个人脸")
return registered_count
def extract_face_features(self, image_path: str) -> List[Dict]:
"""
从图像中提取所有人脸特征
Args:
image_path: 图像路径
Returns:
List[Dict]: 包含多个人脸信息的列表
"""
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
faces = self.app.get(img)
results = []
for i, face in enumerate(faces):
face_info = {
'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2]
'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...]
'embedding': face.embedding.tolist(), # 特征向量
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score) # 检测置信度
}
results.append(face_info)
return results
def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""
计算两个人脸特征的相似度
Args:
embedding1: 特征向量1
embedding2: 特征向量2
Returns:
float: 余弦相似度 (0-1)
"""
# 归一化向量
embedding1 = embedding1 / np.linalg.norm(embedding1)
embedding2 = embedding2 / np.linalg.norm(embedding2)
# 计算余弦相似度
similarity = np.dot(embedding1, embedding2)
return float(similarity)
def search_face(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
"""
在数据库中搜索最相似的人脸
Args:
query_embedding: 查询特征向量
top_k: 返回最相似的k个结果
Returns:
List[Tuple[str, float]]: (人员ID, 相似度)
"""
if not self.face_database:
return []
results = []
query_embedding = query_embedding / np.linalg.norm(query_embedding)
for person_id, data in self.face_database.items():
db_embedding = data['embedding'] / np.linalg.norm(data['embedding'])
similarity = np.dot(query_embedding, db_embedding)
if similarity > self.similarity_threshold:
results.append((person_id, float(similarity)))
# 按相似度降序排序
results.sort(key=lambda x: x[1], reverse=True)
# 返回top_k个结果
return results[:top_k]
def recognize_faces_in_image(self, image_path: str, output_path: str = None,
draw_landmarks: bool = True, draw_info: bool = True) -> Dict:
"""
识别图像中的所有脸并进行标注
Args:
image_path: 输入图像路径
output_path: 输出图像路径
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
Returns:
Dict: 识别结果
"""
# 读取图像
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
# 提取人脸特征
faces = self.extract_face_features(image_path)
recognition_results = {
'image_path': image_path,
'faces_detected': len(faces),
'recognitions': []
}
# 对每个检测到的人脸进行识别
for i, face in enumerate(faces):
# 在数据库中搜索
query_embedding = np.array(face['embedding'])
search_results = self.search_face(query_embedding, top_k=1)
# 识别结果
person_id = "Unknown"
similarity = 0.0
if search_results:
person_id, similarity = search_results[0]
# 存储识别结果
face_recognition = {
'face_index': i,
'bbox': face['bbox'],
'person_id': person_id,
'similarity': similarity,
'gender': face['gender'],
'age': face['age'],
'det_score': face['det_score']
}
recognition_results['recognitions'].append(face_recognition)
# 在图像上绘制标注
self._draw_face_annotation(img, face_recognition, draw_landmarks, draw_info)
# 保存输出图像
if output_path:
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
cv2.imwrite(output_path, img)
print(f"标注图像已保存: {output_path}")
return recognition_results
def _draw_face_annotation(self, img: np.ndarray, face_info: Dict,
draw_landmarks: bool, draw_info: bool):
"""
在图像上绘制人脸标注
Args:
img: 图像数组
face_info: 人脸信息
draw_landmarks: 是否绘制关键点
draw_info: 是否绘制详细信息
"""
bbox = face_info['bbox'] # [x1, y1, x2, y2]
person_id = face_info['person_id']
similarity = face_info['similarity']
# 确定颜色:已知人脸用绿色,未知用红色
if person_id != "Unknown":
color = self.colors['known']
label = f"{person_id} ({similarity:.3f})"
else:
color = self.colors['unknown']
label = "Unknown"
# 绘制人脸框
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
# 绘制关键点
if draw_landmarks and 'kps' in face_info:
for kp in face_info['kps']:
cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1)
# 绘制信息文本
if draw_info:
# 基础信息
text_y = bbox[1] - 10
if text_y < 20:
text_y = bbox[3] + 20
cv2.putText(img, label, (bbox[0], text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2)
# 详细信息
detail_text = f"{face_info['gender']}/{face_info['age']}"
cv2.putText(img, detail_text, (bbox[0], text_y + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1)
def batch_process_images(self, input_dir: str, output_dir: str,
image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict:
"""
批量处理目录中的所有图像
Args:
input_dir: 输入图像目录
output_dir: 输出图像目录
image_extensions: 图像文件扩展名
Returns:
Dict: 批量处理结果统计
"""
if not os.path.exists(input_dir):
raise ValueError(f"输入目录不存在: {input_dir}")
os.makedirs(output_dir, exist_ok=True)
# 获取所有图像文件
image_files = []
for ext in image_extensions:
image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)])
print(f"找到 {len(image_files)} 个图像文件")
# 处理统计
stats = {
'total_images': len(image_files),
'processed_images': 0,
'total_faces': 0,
'recognized_faces': 0,
'results': []
}
# 批量处理
for image_file in image_files:
input_path = os.path.join(input_dir, image_file)
output_path = os.path.join(output_dir, f"annotated_{image_file}")
try:
# 处理单张图像
result = self.recognize_faces_in_image(input_path, output_path)
# 更新统计
stats['processed_images'] += 1
stats['total_faces'] += result['faces_detected']
recognized = sum(1 for face in result['recognitions'] if face['person_id'] != "Unknown")
stats['recognized_faces'] += recognized
stats['results'].append({
'image_file': image_file,
'faces_detected': result['faces_detected'],
'recognized_faces': recognized
})
print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, 识别出 {recognized}")
except Exception as e:
print(f"处理图像 {image_file} 时出错: {e}")
# 保存处理统计
stats_path = os.path.join(output_dir, "processing_stats.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"批量处理完成! 统计信息已保存: {stats_path}")
return stats
def save_database(self):
"""保存人脸数据库到文件"""
# 转换为可序列化的格式
save_data = {}
for person_id, data in self.face_database.items():
save_data[person_id] = {
'embedding': data['embedding'].tolist(),
'image_path': data['image_path'],
'registration_time': data.get('registration_time', 'Unknown')
}
with open(self.database_file, 'wb') as f:
pickle.dump(save_data, f)
print(f"人脸数据库已保存: {self.database_file} (共 {len(self.face_database)} 人)")
def load_database(self):
"""从文件加载人脸数据库"""
if os.path.exists(self.database_file):
try:
with open(self.database_file, 'rb') as f:
save_data = pickle.load(f)
# 转换回numpy数组
for person_id, data in save_data.items():
self.face_database[person_id] = {
'embedding': np.array(data['embedding']),
'image_path': data['image_path'],
'registration_time': data.get('registration_time', 'Unknown')
}
print(f"人脸数据库已加载: {len(self.face_database)}")
except Exception as e:
print(f"加载数据库失败: {e}")
self.face_database = {}
else:
print("数据库文件不存在,将创建新数据库")
self.face_database = {}
def get_database_info(self) -> Dict:
"""获取数据库信息"""
return {
'total_persons': len(self.face_database),
'person_ids': list(self.face_database.keys()),
'database_file': self.database_file
}
def set_similarity_threshold(self, threshold: float):
"""设置相似度阈值"""
if 0 <= threshold <= 1:
self.similarity_threshold = threshold
print(f"相似度阈值已设置为: {threshold}")
else:
print("阈值必须在 0 到 1 之间")
# 使用示例和演示
def demo_usage():
"""演示使用方法"""
# 创建人脸识别系统
face_system = MultiFaceRecognitionSystem()
# 1. 注册人脸到数据库
print("=== 注册人脸 ===")
# 创建测试数据目录
os.makedirs("test_data/register", exist_ok=True)
os.makedirs("test_data/query", exist_ok=True)
os.makedirs("test_data/output", exist_ok=True)
# 假设你有一些人脸图像用于注册
# face_system.register_face("test_data/register/person1.jpg", "alice")
# face_system.register_face("test_data/register/person2.jpg", "bob")
# face_system.register_face("test_data/register/person3.jpg", "charlie")
# # 或者批量注册
# face_system.batch_register_faces("test_data/register", "person")
# 2. 查看数据库信息
print("\n=== 数据库信息 ===")
db_info = face_system.get_database_info()
print(f"数据库人数: {db_info['total_persons']}")
print(f"人员ID: {db_info['person_ids']}")
# 3. 单张图像识别示例
print("\n=== 单张图像识别 ===")
query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像
if os.path.exists(query_image):
result = face_system.recognize_faces_in_image(
image_path=query_image,
output_path="test_data/output/annotated_multi_face.jpg",
draw_landmarks=True,
draw_info=True
)
print(f"检测到 {result['faces_detected']} 张人脸:")
for face in result['recognitions']:
status = "已知" if face['person_id'] != "Unknown" else "未知"
print(f" 人脸 {face['face_index'] + 1}: {face['person_id']} "
f"(相似度: {face['similarity']:.3f}, {face['gender']}/{face['age']}岁) - {status}")
else:
print(f"查询图像不存在: {query_image}")
print("请将包含多人脸的图像放在 test_data/query/ 目录下")
# 4. 批量处理示例
print("\n=== 批量处理图像 ===")
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
stats = face_system.batch_process_images(
input_dir="test_data/query",
output_dir="test_data/output/batch_results"
)
print(f"批量处理统计:")
print(f" 总图像数: {stats['total_images']}")
print(f" 成功处理: {stats['processed_images']}")
print(f" 总人脸数: {stats['total_faces']}")
print(f" 识别出的人脸: {stats['recognized_faces']}")
# 5. 调整相似度阈值
print("\n=== 调整阈值 ===")
face_system.set_similarity_threshold(0.1) # 降低阈值,更宽松
# face_system.set_similarity_threshold(0.7) # 提高阈值,更严格
if __name__ == "__main__":
demo_usage()