备份一些py文件
This commit is contained in:
496
backup/face_recognition_system_2.py
Normal file
496
backup/face_recognition_system_2.py
Normal file
@@ -0,0 +1,496 @@
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
from insightface.app import FaceAnalysis
|
||||
import pickle
|
||||
from typing import List, Tuple, Dict, Optional
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
|
||||
class MultiFaceRecognitionSystem:
|
||||
"""
|
||||
多人脸识别系统
|
||||
支持多个人脸检测、识别和标注
|
||||
"""
|
||||
|
||||
def __init__(self, model_name: str = 'buffalo_l'):
|
||||
"""
|
||||
初始化人脸识别系统
|
||||
|
||||
Args:
|
||||
model_name: 模型名称,可选 'buffalo_l', 'buffalo_s' 等
|
||||
"""
|
||||
# 初始化InsightFace
|
||||
self.app = FaceAnalysis(name=model_name, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
|
||||
self.app.prepare(ctx_id=0, det_size=(640, 640))
|
||||
|
||||
# 人脸数据库
|
||||
self.face_database = {}
|
||||
self.database_file = "face_database.pkl"
|
||||
|
||||
# 加载已有数据库
|
||||
self.load_database()
|
||||
|
||||
# 相似度阈值
|
||||
self.similarity_threshold = 0.6
|
||||
|
||||
# 颜色配置
|
||||
self.colors = {
|
||||
'known': (0, 255, 0), # 绿色 - 已知人脸
|
||||
'unknown': (0, 0, 255), # 红色 - 未知人脸
|
||||
'text': (255, 255, 255), # 白色 - 文字
|
||||
'landmark': (255, 0, 0) # 蓝色 - 关键点
|
||||
}
|
||||
|
||||
def register_face(self, image_path: str, person_id: str) -> bool:
|
||||
"""
|
||||
注册单个人脸到数据库
|
||||
|
||||
Args:
|
||||
image_path: 图像路径
|
||||
person_id: 人员ID
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
"""
|
||||
if not os.path.exists(image_path):
|
||||
print(f"图像文件不存在: {image_path}")
|
||||
return False
|
||||
|
||||
# 提取人脸特征
|
||||
faces = self.extract_face_features(image_path)
|
||||
|
||||
if not faces:
|
||||
print(f"在图像 {image_path} 中未检测到人脸")
|
||||
return False
|
||||
|
||||
if len(faces) > 1:
|
||||
print(f"在图像 {image_path} 中检测到多个人脸,将使用第一个人脸")
|
||||
|
||||
# 存储到数据库
|
||||
self.face_database[person_id] = {
|
||||
'embedding': np.array(faces[0]['embedding']),
|
||||
'image_path': image_path,
|
||||
'registration_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
|
||||
# 保存数据库
|
||||
self.save_database()
|
||||
print(f"成功注册人脸: {person_id}")
|
||||
return True
|
||||
|
||||
def batch_register_faces(self, image_dir: str, id_prefix: str = "person") -> int:
|
||||
"""
|
||||
批量注册人脸
|
||||
|
||||
Args:
|
||||
image_dir: 图像目录
|
||||
id_prefix: ID前缀
|
||||
|
||||
Returns:
|
||||
int: 成功注册的数量
|
||||
"""
|
||||
if not os.path.exists(image_dir):
|
||||
print(f"目录不存在: {image_dir}")
|
||||
return 0
|
||||
|
||||
registered_count = 0
|
||||
image_files = [f for f in os.listdir(image_dir)
|
||||
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
|
||||
|
||||
for i, filename in enumerate(image_files):
|
||||
image_path = os.path.join(image_dir, filename)
|
||||
person_id = f"{id_prefix}_{i + 1:03d}"
|
||||
|
||||
if self.register_face(image_path, person_id):
|
||||
registered_count += 1
|
||||
|
||||
print(f"批量注册完成: {registered_count}/{len(image_files)} 个人脸")
|
||||
return registered_count
|
||||
|
||||
def extract_face_features(self, image_path: str) -> List[Dict]:
|
||||
"""
|
||||
从图像中提取所有人脸特征
|
||||
|
||||
Args:
|
||||
image_path: 图像路径
|
||||
|
||||
Returns:
|
||||
List[Dict]: 包含多个人脸信息的列表
|
||||
"""
|
||||
img = cv2.imread(image_path)
|
||||
if img is None:
|
||||
raise ValueError(f"无法读取图像: {image_path}")
|
||||
|
||||
faces = self.app.get(img)
|
||||
results = []
|
||||
|
||||
for i, face in enumerate(faces):
|
||||
face_info = {
|
||||
'bbox': face.bbox.astype(int).tolist(), # 人脸框 [x1, y1, x2, y2]
|
||||
'kps': face.kps.astype(int).tolist(), # 关键点 [[x1,y1], [x2,y2], ...]
|
||||
'embedding': face.embedding.tolist(), # 特征向量
|
||||
'gender': 'Male' if face.gender == 1 else 'Female',
|
||||
'age': int(face.age),
|
||||
'det_score': float(face.det_score) # 检测置信度
|
||||
}
|
||||
results.append(face_info)
|
||||
|
||||
return results
|
||||
|
||||
def compare_faces(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
|
||||
"""
|
||||
计算两个人脸特征的相似度
|
||||
|
||||
Args:
|
||||
embedding1: 特征向量1
|
||||
embedding2: 特征向量2
|
||||
|
||||
Returns:
|
||||
float: 余弦相似度 (0-1)
|
||||
"""
|
||||
# 归一化向量
|
||||
embedding1 = embedding1 / np.linalg.norm(embedding1)
|
||||
embedding2 = embedding2 / np.linalg.norm(embedding2)
|
||||
|
||||
# 计算余弦相似度
|
||||
similarity = np.dot(embedding1, embedding2)
|
||||
return float(similarity)
|
||||
|
||||
def search_face(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
|
||||
"""
|
||||
在数据库中搜索最相似的人脸
|
||||
|
||||
Args:
|
||||
query_embedding: 查询特征向量
|
||||
top_k: 返回最相似的k个结果
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, float]]: (人员ID, 相似度)
|
||||
"""
|
||||
if not self.face_database:
|
||||
return []
|
||||
|
||||
results = []
|
||||
query_embedding = query_embedding / np.linalg.norm(query_embedding)
|
||||
|
||||
for person_id, data in self.face_database.items():
|
||||
db_embedding = data['embedding'] / np.linalg.norm(data['embedding'])
|
||||
similarity = np.dot(query_embedding, db_embedding)
|
||||
|
||||
if similarity > self.similarity_threshold:
|
||||
results.append((person_id, float(similarity)))
|
||||
|
||||
# 按相似度降序排序
|
||||
results.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 返回top_k个结果
|
||||
return results[:top_k]
|
||||
|
||||
def recognize_faces_in_image(self, image_path: str, output_path: str = None,
|
||||
draw_landmarks: bool = True, draw_info: bool = True) -> Dict:
|
||||
"""
|
||||
识别图像中的所有脸并进行标注
|
||||
|
||||
Args:
|
||||
image_path: 输入图像路径
|
||||
output_path: 输出图像路径
|
||||
draw_landmarks: 是否绘制关键点
|
||||
draw_info: 是否绘制详细信息
|
||||
|
||||
Returns:
|
||||
Dict: 识别结果
|
||||
"""
|
||||
# 读取图像
|
||||
img = cv2.imread(image_path)
|
||||
if img is None:
|
||||
raise ValueError(f"无法读取图像: {image_path}")
|
||||
|
||||
# 提取人脸特征
|
||||
faces = self.extract_face_features(image_path)
|
||||
recognition_results = {
|
||||
'image_path': image_path,
|
||||
'faces_detected': len(faces),
|
||||
'recognitions': []
|
||||
}
|
||||
|
||||
# 对每个检测到的人脸进行识别
|
||||
for i, face in enumerate(faces):
|
||||
# 在数据库中搜索
|
||||
query_embedding = np.array(face['embedding'])
|
||||
search_results = self.search_face(query_embedding, top_k=1)
|
||||
|
||||
# 识别结果
|
||||
person_id = "Unknown"
|
||||
similarity = 0.0
|
||||
|
||||
if search_results:
|
||||
person_id, similarity = search_results[0]
|
||||
|
||||
# 存储识别结果
|
||||
face_recognition = {
|
||||
'face_index': i,
|
||||
'bbox': face['bbox'],
|
||||
'person_id': person_id,
|
||||
'similarity': similarity,
|
||||
'gender': face['gender'],
|
||||
'age': face['age'],
|
||||
'det_score': face['det_score']
|
||||
}
|
||||
recognition_results['recognitions'].append(face_recognition)
|
||||
|
||||
# 在图像上绘制标注
|
||||
self._draw_face_annotation(img, face_recognition, draw_landmarks, draw_info)
|
||||
|
||||
# 保存输出图像
|
||||
if output_path:
|
||||
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
|
||||
cv2.imwrite(output_path, img)
|
||||
print(f"标注图像已保存: {output_path}")
|
||||
|
||||
return recognition_results
|
||||
|
||||
def _draw_face_annotation(self, img: np.ndarray, face_info: Dict,
|
||||
draw_landmarks: bool, draw_info: bool):
|
||||
"""
|
||||
在图像上绘制人脸标注
|
||||
|
||||
Args:
|
||||
img: 图像数组
|
||||
face_info: 人脸信息
|
||||
draw_landmarks: 是否绘制关键点
|
||||
draw_info: 是否绘制详细信息
|
||||
"""
|
||||
bbox = face_info['bbox'] # [x1, y1, x2, y2]
|
||||
person_id = face_info['person_id']
|
||||
similarity = face_info['similarity']
|
||||
|
||||
# 确定颜色:已知人脸用绿色,未知用红色
|
||||
if person_id != "Unknown":
|
||||
color = self.colors['known']
|
||||
label = f"{person_id} ({similarity:.3f})"
|
||||
else:
|
||||
color = self.colors['unknown']
|
||||
label = "Unknown"
|
||||
|
||||
# 绘制人脸框
|
||||
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
|
||||
|
||||
# 绘制关键点
|
||||
if draw_landmarks and 'kps' in face_info:
|
||||
for kp in face_info['kps']:
|
||||
cv2.circle(img, (kp[0], kp[1]), 2, self.colors['landmark'], -1)
|
||||
|
||||
# 绘制信息文本
|
||||
if draw_info:
|
||||
# 基础信息
|
||||
text_y = bbox[1] - 10
|
||||
if text_y < 20:
|
||||
text_y = bbox[3] + 20
|
||||
|
||||
cv2.putText(img, label, (bbox[0], text_y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.colors['text'], 2)
|
||||
|
||||
# 详细信息
|
||||
detail_text = f"{face_info['gender']}/{face_info['age']}"
|
||||
cv2.putText(img, detail_text, (bbox[0], text_y + 25),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['text'], 1)
|
||||
|
||||
def batch_process_images(self, input_dir: str, output_dir: str,
|
||||
image_extensions: tuple = ('.jpg', '.jpeg', '.png')) -> Dict:
|
||||
"""
|
||||
批量处理目录中的所有图像
|
||||
|
||||
Args:
|
||||
input_dir: 输入图像目录
|
||||
output_dir: 输出图像目录
|
||||
image_extensions: 图像文件扩展名
|
||||
|
||||
Returns:
|
||||
Dict: 批量处理结果统计
|
||||
"""
|
||||
if not os.path.exists(input_dir):
|
||||
raise ValueError(f"输入目录不存在: {input_dir}")
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# 获取所有图像文件
|
||||
image_files = []
|
||||
for ext in image_extensions:
|
||||
image_files.extend([f for f in os.listdir(input_dir) if f.lower().endswith(ext)])
|
||||
|
||||
print(f"找到 {len(image_files)} 个图像文件")
|
||||
|
||||
# 处理统计
|
||||
stats = {
|
||||
'total_images': len(image_files),
|
||||
'processed_images': 0,
|
||||
'total_faces': 0,
|
||||
'recognized_faces': 0,
|
||||
'results': []
|
||||
}
|
||||
|
||||
# 批量处理
|
||||
for image_file in image_files:
|
||||
input_path = os.path.join(input_dir, image_file)
|
||||
output_path = os.path.join(output_dir, f"annotated_{image_file}")
|
||||
|
||||
try:
|
||||
# 处理单张图像
|
||||
result = self.recognize_faces_in_image(input_path, output_path)
|
||||
|
||||
# 更新统计
|
||||
stats['processed_images'] += 1
|
||||
stats['total_faces'] += result['faces_detected']
|
||||
|
||||
recognized = sum(1 for face in result['recognitions'] if face['person_id'] != "Unknown")
|
||||
stats['recognized_faces'] += recognized
|
||||
|
||||
stats['results'].append({
|
||||
'image_file': image_file,
|
||||
'faces_detected': result['faces_detected'],
|
||||
'recognized_faces': recognized
|
||||
})
|
||||
|
||||
print(f"处理完成: {image_file} -> 检测到 {result['faces_detected']} 张脸, 识别出 {recognized} 张")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理图像 {image_file} 时出错: {e}")
|
||||
|
||||
# 保存处理统计
|
||||
stats_path = os.path.join(output_dir, "processing_stats.json")
|
||||
with open(stats_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(stats, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"批量处理完成! 统计信息已保存: {stats_path}")
|
||||
return stats
|
||||
|
||||
def save_database(self):
|
||||
"""保存人脸数据库到文件"""
|
||||
# 转换为可序列化的格式
|
||||
save_data = {}
|
||||
for person_id, data in self.face_database.items():
|
||||
save_data[person_id] = {
|
||||
'embedding': data['embedding'].tolist(),
|
||||
'image_path': data['image_path'],
|
||||
'registration_time': data.get('registration_time', 'Unknown')
|
||||
}
|
||||
|
||||
with open(self.database_file, 'wb') as f:
|
||||
pickle.dump(save_data, f)
|
||||
|
||||
print(f"人脸数据库已保存: {self.database_file} (共 {len(self.face_database)} 人)")
|
||||
|
||||
def load_database(self):
|
||||
"""从文件加载人脸数据库"""
|
||||
if os.path.exists(self.database_file):
|
||||
try:
|
||||
with open(self.database_file, 'rb') as f:
|
||||
save_data = pickle.load(f)
|
||||
|
||||
# 转换回numpy数组
|
||||
for person_id, data in save_data.items():
|
||||
self.face_database[person_id] = {
|
||||
'embedding': np.array(data['embedding']),
|
||||
'image_path': data['image_path'],
|
||||
'registration_time': data.get('registration_time', 'Unknown')
|
||||
}
|
||||
|
||||
print(f"人脸数据库已加载: {len(self.face_database)} 人")
|
||||
except Exception as e:
|
||||
print(f"加载数据库失败: {e}")
|
||||
self.face_database = {}
|
||||
else:
|
||||
print("数据库文件不存在,将创建新数据库")
|
||||
self.face_database = {}
|
||||
|
||||
def get_database_info(self) -> Dict:
|
||||
"""获取数据库信息"""
|
||||
return {
|
||||
'total_persons': len(self.face_database),
|
||||
'person_ids': list(self.face_database.keys()),
|
||||
'database_file': self.database_file
|
||||
}
|
||||
|
||||
def set_similarity_threshold(self, threshold: float):
|
||||
"""设置相似度阈值"""
|
||||
if 0 <= threshold <= 1:
|
||||
self.similarity_threshold = threshold
|
||||
print(f"相似度阈值已设置为: {threshold}")
|
||||
else:
|
||||
print("阈值必须在 0 到 1 之间")
|
||||
|
||||
|
||||
# 使用示例和演示
|
||||
def demo_usage():
|
||||
"""演示使用方法"""
|
||||
|
||||
# 创建人脸识别系统
|
||||
face_system = MultiFaceRecognitionSystem()
|
||||
|
||||
# 1. 注册人脸到数据库
|
||||
print("=== 注册人脸 ===")
|
||||
|
||||
# 创建测试数据目录
|
||||
os.makedirs("test_data/register", exist_ok=True)
|
||||
os.makedirs("test_data/query", exist_ok=True)
|
||||
os.makedirs("test_data/output", exist_ok=True)
|
||||
|
||||
# 假设你有一些人脸图像用于注册
|
||||
# face_system.register_face("test_data/register/person1.jpg", "alice")
|
||||
# face_system.register_face("test_data/register/person2.jpg", "bob")
|
||||
# face_system.register_face("test_data/register/person3.jpg", "charlie")
|
||||
|
||||
# # 或者批量注册
|
||||
# face_system.batch_register_faces("test_data/register", "person")
|
||||
|
||||
# 2. 查看数据库信息
|
||||
print("\n=== 数据库信息 ===")
|
||||
db_info = face_system.get_database_info()
|
||||
print(f"数据库人数: {db_info['total_persons']}")
|
||||
print(f"人员ID: {db_info['person_ids']}")
|
||||
|
||||
# 3. 单张图像识别示例
|
||||
print("\n=== 单张图像识别 ===")
|
||||
query_image = "test_data/query/multi_face.jpg" # 包含多个人的图像
|
||||
|
||||
if os.path.exists(query_image):
|
||||
result = face_system.recognize_faces_in_image(
|
||||
image_path=query_image,
|
||||
output_path="test_data/output/annotated_multi_face.jpg",
|
||||
draw_landmarks=True,
|
||||
draw_info=True
|
||||
)
|
||||
|
||||
print(f"检测到 {result['faces_detected']} 张人脸:")
|
||||
for face in result['recognitions']:
|
||||
status = "已知" if face['person_id'] != "Unknown" else "未知"
|
||||
print(f" 人脸 {face['face_index'] + 1}: {face['person_id']} "
|
||||
f"(相似度: {face['similarity']:.3f}, {face['gender']}/{face['age']}岁) - {status}")
|
||||
else:
|
||||
print(f"查询图像不存在: {query_image}")
|
||||
print("请将包含多人脸的图像放在 test_data/query/ 目录下")
|
||||
|
||||
# 4. 批量处理示例
|
||||
print("\n=== 批量处理图像 ===")
|
||||
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
|
||||
stats = face_system.batch_process_images(
|
||||
input_dir="test_data/query",
|
||||
output_dir="test_data/output/batch_results"
|
||||
)
|
||||
|
||||
print(f"批量处理统计:")
|
||||
print(f" 总图像数: {stats['total_images']}")
|
||||
print(f" 成功处理: {stats['processed_images']}")
|
||||
print(f" 总人脸数: {stats['total_faces']}")
|
||||
print(f" 识别出的人脸: {stats['recognized_faces']}")
|
||||
|
||||
# 5. 调整相似度阈值
|
||||
print("\n=== 调整阈值 ===")
|
||||
face_system.set_similarity_threshold(0.1) # 降低阈值,更宽松
|
||||
# face_system.set_similarity_threshold(0.7) # 提高阈值,更严格
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
demo_usage()
|
||||
Reference in New Issue
Block a user