496 lines
17 KiB
Python
496 lines
17 KiB
Python
import os
|
|
import cv2
|
|
import numpy as np
|
|
from insightface.app import FaceAnalysis
|
|
import pickle
|
|
from typing import List, Tuple, Dict, Optional
|
|
from datetime import datetime
|
|
import json
|
|
|
|
|
|
class MultiFaceRecognitionSystem:
|
|
"""
|
|
多人脸识别系统
|
|
支持多个人脸检测、识别和标注
|
|
"""
|
|
|
|
def __init__(self, model_name: str = 'buffalo_l'):
|
|
"""
|
|
初始化人脸识别系统
|
|
|
|
Args:
|
|
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等
|
|
"""
|
|
# 初始化InsightFace
|
|
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
|
|
self.app.prepare(ctx_id=0, det_size=(640, 640))
|
|
|
|
# 人脸数据库
|
|
self.face_database = {}
|
|
self.database_file = "face_database.pkl"
|
|
|
|
# 加载已有数据库
|
|
self.load_database()
|
|
|
|
# 相似度阈值
|
|
self.similarity_threshold = 0.6
|
|
|
|
# 颜色配置
|
|
self.colors = {
|
|
'known': (0, 255, 0), # 绿色 - 已知人脸
|
|
'unknown': (0, 0, 255), # 红色 - 未知人脸
|
|
'text': (255, 255, 255), # 白色 - 文字
|
|
'landmark': (255, 0, 0) # 蓝色 - 关键点
|
|
}
|
|
|
|
def register_face(self, image_path: str, person_id: str) -> bool:
|
|
"""
|
|
注册单个人脸到数据库
|
|
|
|
Args:
|
|
image_path: 图像路径
|
|
person_id: 人员ID
|
|
|
|
Returns:
|
|
bool: 注册是否成功
|
|
"""
|
|
if not os.path.exists(image_path):
|
|
print(f"图像文件不存在: {image_path}")
|
|
return False
|
|
|
|
# 提取人脸特征
|
|
faces = self.extract_face_features(image_path)
|
|
|
|
if not faces:
|
|
print(f"在图像 {image_path} 中未检测到人脸")
|
|
return False
|
|
|
|
if len(faces) > 1:
|
|
print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
|
|
|
|
# 存储到数据库
|
|
self.face_database[person_id] = {
|
|
'embedding': np.array(faces[0]['embedding']),
|
|
'image_path': image_path,
|
|
'registration_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
# 保存数据库
|
|
self.save_database()
|
|
print(f"成功注册人脸: {person_id}")
|
|
return True
|
|
|
|
def batch_register_faces(self, image_dir: str, id_prefix: str = "person") -> int:
|
|
"""
|
|
批量注册人脸
|
|
|
|
Args:
|
|
image_dir: 图像目录
|
|
id_prefix: ID前缀
|
|
|
|
Returns:
|
|
int: 成功注册的数量
|
|
"""
|
|
if not os.path.exists(image_dir):
|
|
print(f"目录不存在: {image_dir}")
|
|
return 0
|
|
|
|
registered_count = 0
|
|
image_files = [f for f in os.listdir(image_dir)
|
|
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
|
|
|
|
for i, filename in enumerate(image_files):
|
|
image_path = os.path.join(image_dir, filename)
|
|
person_id = f"{id_prefix}_{i + 1:03d}"
|
|
|
|
if self.register_face(image_path, person_id):
|
|
registered_count += 1
|
|
|
|
print(f"批量注册完成: {registered_count}/{len(image_files)} 个人脸")
|
|
return registered_count
|
|
|
|
def extract_face_features(self, image_path: str) -> List[Dict]:
|
|
"""
|
|
从图像中提取所有人脸特征
|
|
|
|
Args:
|
|
image_path: 图像路径
|
|
|
|
Returns:
|
|
List[Dict]: 包含多个人脸信息的列表
|
|
"""
|
|
img = cv2.imread(image_path)
|
|
if img is None:
|
|
raise ValueError(f"无法读取图像: {image_path}")
|
|
|
|
faces = self.app.get(img)
|
|
results = []
|
|
|
|
for i, face in enumerate(faces):
|
|
face_info = {
|
|
'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2]
|
|
'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...]
|
|
'embedding': face.embedding.tolist(), # 特征向量
|
|
'gender': 'Male' if face.gender == 1 else 'Female',
|
|
'age': int(face.age),
|
|
'det_score': float(face.det_score) # 检测置信度
|
|
}
|
|
results.append(face_info)
|
|
|
|
return results
|
|
|
|
def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
|
|
"""
|
|
计算两个人脸特征的相似度
|
|
|
|
Args:
|
|
embedding1: 特征向量1
|
|
embedding2: 特征向量2
|
|
|
|
Returns:
|
|
float: 余弦相似度 (0-1)
|
|
"""
|
|
# 归一化向量
|
|
embedding1 = embedding1 / np.linalg.norm(embedding1)
|
|
embedding2 = embedding2 / np.linalg.norm(embedding2)
|
|
|
|
# 计算余弦相似度
|
|
similarity = np.dot(embedding1, embedding2)
|
|
return float(similarity)
|
|
|
|
def search_face(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
|
|
"""
|
|
在数据库中搜索最相似的人脸
|
|
|
|
Args:
|
|
query_embedding: 查询特征向量
|
|
top_k: 返回最相似的k个结果
|
|
|
|
Returns:
|
|
List[Tuple[str, float]]: (人员ID, 相似度)
|
|
"""
|
|
if not self.face_database:
|
|
return []
|
|
|
|
results = []
|
|
query_embedding = query_embedding / np.linalg.norm(query_embedding)
|
|
|
|
for person_id, data in self.face_database.items():
|
|
db_embedding = data['embedding'] / np.linalg.norm(data['embedding'])
|
|
similarity = np.dot(query_embedding, db_embedding)
|
|
|
|
if similarity > self.similarity_threshold:
|
|
results.append((person_id, float(similarity)))
|
|
|
|
# 按相似度降序排序
|
|
results.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# 返回top_k个结果
|
|
return results[:top_k]
|
|
|
|
def recognize_faces_in_image(self, image_path: str, output_path: str = None,
|
|
draw_landmarks: bool = True, draw_info: bool = True) -> Dict:
|
|
"""
|
|
识别图像中的所有脸并进行标注
|
|
|
|
Args:
|
|
image_path: 输入图像路径
|
|
output_path: 输出图像路径
|
|
draw_landmarks: 是否绘制关键点
|
|
draw_info: 是否绘制详细信息
|
|
|
|
Returns:
|
|
Dict: 识别结果
|
|
"""
|
|
# 读取图像
|
|
img = cv2.imread(image_path)
|
|
if img is None:
|
|
raise ValueError(f"无法读取图像: {image_path}")
|
|
|
|
# 提取人脸特征
|
|
faces = self.extract_face_features(image_path)
|
|
recognition_results = {
|
|
'image_path': image_path,
|
|
'faces_detected': len(faces),
|
|
'recognitions': []
|
|
}
|
|
|
|
# 对每个检测到的人脸进行识别
|
|
for i, face in enumerate(faces):
|
|
# 在数据库中搜索
|
|
query_embedding = np.array(face['embedding'])
|
|
search_results = self.search_face(query_embedding, top_k=1)
|
|
|
|
# 识别结果
|
|
person_id = "Unknown"
|
|
similarity = 0.0
|
|
|
|
if search_results:
|
|
person_id, similarity = search_results[0]
|
|
|
|
# 存储识别结果
|
|
face_recognition = {
|
|
'face_index': i,
|
|
'bbox': face['bbox'],
|
|
'person_id': person_id,
|
|
'similarity': similarity,
|
|
'gender': face['gender'],
|
|
'age': face['age'],
|
|
'det_score': face['det_score']
|
|
}
|
|
recognition_results['recognitions'].append(face_recognition)
|
|
|
|
# 在图像上绘制标注
|
|
self._draw_face_annotation(img, face_recognition, draw_landmarks, draw_info)
|
|
|
|
# 保存输出图像
|
|
if output_path:
|
|
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
|
|
cv2.imwrite(output_path, img)
|
|
print(f"标注图像已保存: {output_path}")
|
|
|
|
return recognition_results
|
|
|
|
def _draw_face_annotation(self, img: np.ndarray, face_info: Dict,
|
|
draw_landmarks: bool, draw_info: bool):
|
|
"""
|
|
在图像上绘制人脸标注
|
|
|
|
Args:
|
|
img: 图像数组
|
|
face_info: 人脸信息
|
|
draw_landmarks: 是否绘制关键点
|
|
draw_info: 是否绘制详细信息
|
|
"""
|
|
bbox = face_info['bbox'] # [x1, y1, x2, y2]
|
|
person_id = face_info['person_id']
|
|
similarity = face_info['similarity']
|
|
|
|
# 确定颜色:已知人脸用绿色,未知用红色
|
|
if person_id != "Unknown":
|
|
color = self.colors['known']
|
|
label = f"{person_id} ({similarity:.3f})"
|
|
else:
|
|
color = self.colors['unknown']
|
|
label = "Unknown"
|
|
|
|
# 绘制人脸框
|
|
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
|
|
|
|
# 绘制关键点
|
|
if draw_landmarks and 'kps' in face_info:
|
|
for kp in face_info['kps']:
|
|
cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1)
|
|
|
|
# 绘制信息文本
|
|
if draw_info:
|
|
# 基础信息
|
|
text_y = bbox[1] - 10
|
|
if text_y < 20:
|
|
text_y = bbox[3] + 20
|
|
|
|
cv2.putText(img, label, (bbox[0], text_y),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2)
|
|
|
|
# 详细信息
|
|
detail_text = f"{face_info['gender']}/{face_info['age']}"
|
|
cv2.putText(img, detail_text, (bbox[0], text_y + 25),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1)
|
|
|
|
def batch_process_images(self, input_dir: str, output_dir: str,
|
|
image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict:
|
|
"""
|
|
批量处理目录中的所有图像
|
|
|
|
Args:
|
|
input_dir: 输入图像目录
|
|
output_dir: 输出图像目录
|
|
image_extensions: 图像文件扩展名
|
|
|
|
Returns:
|
|
Dict: 批量处理结果统计
|
|
"""
|
|
if not os.path.exists(input_dir):
|
|
raise ValueError(f"输入目录不存在: {input_dir}")
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# 获取所有图像文件
|
|
image_files = []
|
|
for ext in image_extensions:
|
|
image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)])
|
|
|
|
print(f"找到 {len(image_files)} 个图像文件")
|
|
|
|
# 处理统计
|
|
stats = {
|
|
'total_images': len(image_files),
|
|
'processed_images': 0,
|
|
'total_faces': 0,
|
|
'recognized_faces': 0,
|
|
'results': []
|
|
}
|
|
|
|
# 批量处理
|
|
for image_file in image_files:
|
|
input_path = os.path.join(input_dir, image_file)
|
|
output_path = os.path.join(output_dir, f"annotated_{image_file}")
|
|
|
|
try:
|
|
# 处理单张图像
|
|
result = self.recognize_faces_in_image(input_path, output_path)
|
|
|
|
# 更新统计
|
|
stats['processed_images'] += 1
|
|
stats['total_faces'] += result['faces_detected']
|
|
|
|
recognized = sum(1 for face in result['recognitions'] if face['person_id'] != "Unknown")
|
|
stats['recognized_faces'] += recognized
|
|
|
|
stats['results'].append({
|
|
'image_file': image_file,
|
|
'faces_detected': result['faces_detected'],
|
|
'recognized_faces': recognized
|
|
})
|
|
|
|
print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, 识别出 {recognized} 张")
|
|
|
|
except Exception as e:
|
|
print(f"处理图像 {image_file} 时出错: {e}")
|
|
|
|
# 保存处理统计
|
|
stats_path = os.path.join(output_dir, "processing_stats.json")
|
|
with open(stats_path, 'w', encoding='utf-8') as f:
|
|
json.dump(stats, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"批量处理完成! 统计信息已保存: {stats_path}")
|
|
return stats
|
|
|
|
def save_database(self):
|
|
"""保存人脸数据库到文件"""
|
|
# 转换为可序列化的格式
|
|
save_data = {}
|
|
for person_id, data in self.face_database.items():
|
|
save_data[person_id] = {
|
|
'embedding': data['embedding'].tolist(),
|
|
'image_path': data['image_path'],
|
|
'registration_time': data.get('registration_time', 'Unknown')
|
|
}
|
|
|
|
with open(self.database_file, 'wb') as f:
|
|
pickle.dump(save_data, f)
|
|
|
|
print(f"人脸数据库已保存: {self.database_file} (共 {len(self.face_database)} 人)")
|
|
|
|
def load_database(self):
|
|
"""从文件加载人脸数据库"""
|
|
if os.path.exists(self.database_file):
|
|
try:
|
|
with open(self.database_file, 'rb') as f:
|
|
save_data = pickle.load(f)
|
|
|
|
# 转换回numpy数组
|
|
for person_id, data in save_data.items():
|
|
self.face_database[person_id] = {
|
|
'embedding': np.array(data['embedding']),
|
|
'image_path': data['image_path'],
|
|
'registration_time': data.get('registration_time', 'Unknown')
|
|
}
|
|
|
|
print(f"人脸数据库已加载: {len(self.face_database)} 人")
|
|
except Exception as e:
|
|
print(f"加载数据库失败: {e}")
|
|
self.face_database = {}
|
|
else:
|
|
print("数据库文件不存在,将创建新数据库")
|
|
self.face_database = {}
|
|
|
|
def get_database_info(self) -> Dict:
|
|
"""获取数据库信息"""
|
|
return {
|
|
'total_persons': len(self.face_database),
|
|
'person_ids': list(self.face_database.keys()),
|
|
'database_file': self.database_file
|
|
}
|
|
|
|
def set_similarity_threshold(self, threshold: float):
|
|
"""设置相似度阈值"""
|
|
if 0 <= threshold <= 1:
|
|
self.similarity_threshold = threshold
|
|
print(f"相似度阈值已设置为: {threshold}")
|
|
else:
|
|
print("阈值必须在 0 到 1 之间")
|
|
|
|
|
|
# 使用示例和演示
|
|
def demo_usage():
|
|
"""演示使用方法"""
|
|
|
|
# 创建人脸识别系统
|
|
face_system = MultiFaceRecognitionSystem()
|
|
|
|
# 1. 注册人脸到数据库
|
|
print("=== 注册人脸 ===")
|
|
|
|
# 创建测试数据目录
|
|
os.makedirs("test_data/register", exist_ok=True)
|
|
os.makedirs("test_data/query", exist_ok=True)
|
|
os.makedirs("test_data/output", exist_ok=True)
|
|
|
|
# 假设你有一些人脸图像用于注册
|
|
# face_system.register_face("test_data/register/person1.jpg", "alice")
|
|
# face_system.register_face("test_data/register/person2.jpg", "bob")
|
|
# face_system.register_face("test_data/register/person3.jpg", "charlie")
|
|
|
|
# # 或者批量注册
|
|
# face_system.batch_register_faces("test_data/register", "person")
|
|
|
|
# 2. 查看数据库信息
|
|
print("\n=== 数据库信息 ===")
|
|
db_info = face_system.get_database_info()
|
|
print(f"数据库人数: {db_info['total_persons']}")
|
|
print(f"人员ID: {db_info['person_ids']}")
|
|
|
|
# 3. 单张图像识别示例
|
|
print("\n=== 单张图像识别 ===")
|
|
query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像
|
|
|
|
if os.path.exists(query_image):
|
|
result = face_system.recognize_faces_in_image(
|
|
image_path=query_image,
|
|
output_path="test_data/output/annotated_multi_face.jpg",
|
|
draw_landmarks=True,
|
|
draw_info=True
|
|
)
|
|
|
|
print(f"检测到 {result['faces_detected']} 张人脸:")
|
|
for face in result['recognitions']:
|
|
status = "已知" if face['person_id'] != "Unknown" else "未知"
|
|
print(f" 人脸 {face['face_index'] + 1}: {face['person_id']} "
|
|
f"(相似度: {face['similarity']:.3f}, {face['gender']}/{face['age']}岁) - {status}")
|
|
else:
|
|
print(f"查询图像不存在: {query_image}")
|
|
print("请将包含多人脸的图像放在 test_data/query/ 目录下")
|
|
|
|
# 4. 批量处理示例
|
|
print("\n=== 批量处理图像 ===")
|
|
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
|
|
stats = face_system.batch_process_images(
|
|
input_dir="test_data/query",
|
|
output_dir="test_data/output/batch_results"
|
|
)
|
|
|
|
print(f"批量处理统计:")
|
|
print(f" 总图像数: {stats['total_images']}")
|
|
print(f" 成功处理: {stats['processed_images']}")
|
|
print(f" 总人脸数: {stats['total_faces']}")
|
|
print(f" 识别出的人脸: {stats['recognized_faces']}")
|
|
|
|
# 5. 调整相似度阈值
|
|
print("\n=== 调整阈值 ===")
|
|
face_system.set_similarity_threshold(0.1) # 降低阈值,更宽松
|
|
# face_system.set_similarity_threshold(0.7) # 提高阈值,更严格
|
|
|
|
|
|
if __name__ == "__main__":
|
|
demo_usage() |