From 76895da6600e640a607b322c88c0f6f4971422eb Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Sun, 21 Dec 2025 11:43:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=E6=8A=8A=E7=AE=97=E6=B3=95=E5=92=8C=E4=B8=9A=E5=8A=A1=E5=89=A5?= =?UTF-8?q?=E7=A6=BB=E5=BC=80=E6=9D=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/routes/algorithm_router.py | 2 +- src/base_face_biz.py | 516 +++++++++++++++++++++++++++++ src/face_recognition_algorithm.py | 493 +-------------------------- 3 files changed, 528 insertions(+), 483 deletions(-) create mode 100644 src/base_face_biz.py diff --git a/src/api/routes/algorithm_router.py b/src/api/routes/algorithm_router.py index 5ea26d7..651270b 100644 --- a/src/api/routes/algorithm_router.py +++ b/src/api/routes/algorithm_router.py @@ -95,7 +95,7 @@ def process_feature_calculation(feature_id: int) -> bool: # 提取人脸特征 try: - feature_vector = face_algorithm.extract_face_feature(str(image_path)) + feature_vector = face_algorithm.create_business().extract_face_feature(str(image_path)) if feature_vector is not None: # 转换为二进制数据 diff --git a/src/base_face_biz.py b/src/base_face_biz.py new file mode 100644 index 0000000..3f25d32 --- /dev/null +++ b/src/base_face_biz.py @@ -0,0 +1,516 @@ +""" +人脸识别业务基类 +处理具体的业务逻辑,包括参数配置、人脸匹配、质量评估等 +""" + +import cv2 +import numpy as np +import time +from typing import List, Dict, Tuple, Optional +import os +from insightface.app import FaceAnalysis + + +class BaseFaceBiz: + """ + 人脸识别业务基类 + 处理具体的业务逻辑,与底层算法分离 + """ + + def __init__(self, face_analysis: FaceAnalysis): + """ + 初始化业务类 + + 参数: + face_analysis: 已初始化好的FaceAnalysis实例 + """ + self.app = face_analysis + + # 业务参数配置 + self.list_mode = "blacklist" # "blacklist" 或 "whitelist" + self.clarity_threshold = 100.0 # 清晰度阈值,低于此值认为人脸模糊 + self.min_face_size = 20 # 最小人脸像素尺寸 + self.pitch_threshold = 90 # 俯仰角阈值 + self.yaw_threshold = 90 # 偏航角阈值 + self.similarity_threshold = 0.3 # 相似度阈值 + + # 名单相关变量 + self.registered_faces = {} # {name: embedding} + + def set_list_mode(self, mode: str): + """设置名单模式""" + if mode.lower() in ["blacklist", "whitelist"]: + self.list_mode = mode.lower() + print(f"✅ 名单模式设置为: {self.list_mode}") + else: + print("❌ 无效的名单模式,请使用 'blacklist' 或 'whitelist'") + + def get_list_mode(self) -> str: + """获取当前名单模式""" + return self.list_mode + + def set_clarity_threshold(self, threshold: float): + """设置清晰度阈值""" + self.clarity_threshold = threshold + print(f"✅ 清晰度阈值设置为: {threshold}") + + def get_clarity_threshold(self) -> float: + """获取清晰度阈值""" + return self.clarity_threshold + + def set_min_face_size(self, size: int): + """设置最小人脸尺寸""" + self.min_face_size = size + print(f"✅ 最小人脸尺寸设置为: {size}") + + def get_min_face_size(self) -> int: + """获取最小人脸尺寸""" + return self.min_face_size + + def set_pitch_threshold(self, threshold: float): + """设置俯仰角阈值""" + self.pitch_threshold = threshold + print(f"✅ 俯仰角阈值设置为: {threshold}") + + def get_pitch_threshold(self) -> float: + """获取俯仰角阈值""" + return self.pitch_threshold + + def set_yaw_threshold(self, threshold: float): + """设置偏航角阈值""" + self.yaw_threshold = threshold + print(f"✅ 偏航角阈值设置为: {threshold}") + + def get_yaw_threshold(self) -> float: + """获取偏航角阈值""" + return self.yaw_threshold + + def set_similarity_threshold(self, threshold: float): + """设置相似度阈值""" + self.similarity_threshold = threshold + print(f"✅ 相似度阈值设置为: {threshold}") + + def get_similarity_threshold(self) -> float: + """获取相似度阈值""" + return self.similarity_threshold + + def set_registered_faces(self, registered_faces: Dict[str, np.ndarray]): + """ + 直接设置已注册的人脸数据 + + 参数: + registered_faces: 字典格式 {name: embedding} + """ + if not isinstance(registered_faces, dict): + print("❌ 参数必须是字典格式 {name: embedding}") + return False + + # 验证数据格式 + valid_count = 0 + for name, embedding in registered_faces.items(): + if isinstance(embedding, np.ndarray) and embedding.size > 0: + valid_count += 1 + + if valid_count == 0: + print("❌ 未找到有效的人脸嵌入数据") + return False + + self.registered_faces = registered_faces + print(f"✅ 成功设置 {valid_count} 个注册人脸") + return True + + def get_registered_faces(self) -> Dict[str, np.ndarray]: + """获取已注册的人脸数据""" + return self.registered_faces + + def get_registered_face_count(self) -> int: + """获取已注册人脸数量""" + return len(self.registered_faces) + + def load_registered_faces(self, register_dir: str): + """ + 从目录加载注册的人脸图片 + 文件名(去掉后缀)即为人的名字 + """ + import glob + + if not os.path.exists(register_dir): + print(f"❌ 注册目录不存在: {register_dir}") + return False + + # 支持的图片格式 + image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp'] + image_files = [] + + for ext in image_extensions: + image_files.extend(glob.glob(os.path.join(register_dir, ext))) + image_files.extend(glob.glob(os.path.join(register_dir, ext.upper()))) + + if not image_files: + print(f"❌ 在目录 {register_dir} 中未找到图片文件") + return False + + loaded_count = 0 + for image_path in image_files: + # 获取文件名(不含扩展名)作为人名 + person_name = os.path.splitext(os.path.basename(image_path))[0] + + # 读取图片并提取人脸特征 + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取图片: {image_path}") + continue + + faces = self.app.get(img) + if not faces: + print(f"❌ 图片中未检测到人脸: {image_path}") + continue + + # 使用第一张检测到的人脸 + self.registered_faces[person_name] = faces[0].embedding + loaded_count += 1 + print(f"✅ 加载注册人脸: {person_name}") + + print(f"🎉 成功加载 {loaded_count} 张注册人脸") + return loaded_count > 0 + + def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]: + """ + 在注册人脸中查找最佳匹配 + + 返回: + (匹配的人名, 相似度) + """ + if not self.registered_faces: + return None, 0.0 + + best_similarity = 0.0 + best_name = None + + # 归一化查询嵌入 + query_emb = embedding / np.linalg.norm(embedding) + + for name, registered_embedding in self.registered_faces.items(): + # 归一化注册嵌入 + reg_emb = registered_embedding / np.linalg.norm(registered_embedding) + + # 计算余弦相似度 + similarity = float(np.dot(query_emb, reg_emb)) + + if similarity > best_similarity: + best_similarity = similarity + best_name = name + + return best_name, best_similarity + + def calculate_clarity(self, face_region: np.ndarray) -> float: + """ + 计算人脸区域的清晰度/模糊度 + 使用拉普拉斯方差方法:值越高表示图像越清晰 + """ + if len(face_region.shape) == 3: + gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY) + else: + gray = face_region + + # 计算拉普拉斯算子的方差 + laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() + return laplacian_var + + def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]: + """ + 综合判断人脸质量是否可接受 + + 返回: + (是否可接受, 质量指标字典) + """ + quality_metrics = {} + is_acceptable = True + + # 1. 检测置信度 + quality_metrics['det_score'] = float(face.det_score) + + # 2. 人脸姿态角度 + if hasattr(face, 'pose') and face.pose is not None: + pitch, yaw, roll = face.pose + quality_metrics['pitch'] = float(pitch) + quality_metrics['yaw'] = float(yaw) + quality_metrics['roll'] = float(roll) + else: + quality_metrics['pitch'] = 100.0 + quality_metrics['yaw'] = 100.0 + quality_metrics['roll'] = 100.0 + + # 3. 人脸边界框信息 + bbox = face.bbox + x1, y1, x2, y2 = bbox.astype(int) + width = x2 - x1 + height = y2 - y1 + quality_metrics['bbox_width'] = width + quality_metrics['bbox_height'] = height + quality_metrics['bbox_area'] = width * height + quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 + + # 4. 图像清晰度检测 + h, w = frame.shape[:2] + x1_clip = max(0, x1) + y1_clip = max(0, y1) + x2_clip = min(w, x2) + y2_clip = min(h, y2) + + if x2_clip > x1_clip and y2_clip > y1_clip: + face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip] + clarity_score = self.calculate_clarity(face_region) + quality_metrics['clarity_score'] = clarity_score + else: + quality_metrics['clarity_score'] = 0.0 + + # 5. 综合质量评分 + base_score = quality_metrics['det_score'] + + # 清晰度惩罚 + if quality_metrics['clarity_score'] < self.clarity_threshold: + is_acceptable = False + + # 姿态惩罚 + if abs(quality_metrics['yaw']) > self.yaw_threshold: + is_acceptable = False + if abs(quality_metrics['pitch']) > self.pitch_threshold: + is_acceptable = False + + # 尺寸惩罚 + if min(width, height) < self.min_face_size: + is_acceptable = False + + quality_metrics['quality_score'] = base_score + + return is_acceptable, quality_metrics + + def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict], float]: + """ + 处理单帧图像 + + 返回: + (原始帧, 识别结果列表, 处理时间ms) + """ + start_time = time.time() + + # 人脸检测和识别 + faces = self.app.get(frame) + + results = [] + for face in faces: + # 检查人脸质量是否可接受 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) + + # 查找最佳匹配 + best_name, similarity = self.find_best_match(face.embedding) + + # 根据名单模式判断是否匹配 + if self.list_mode == "blacklist": + # 黑名单模式:在黑名单中即为匹配(需要关注) + is_match = best_name is not None and similarity >= self.similarity_threshold + else: # whitelist + # 白名单模式:在白名单中即为匹配(允许通过) + is_match = best_name is not None and similarity >= self.similarity_threshold + + result = { + 'bbox': face.bbox.astype(int).tolist(), + 'similarity': similarity, + 'best_match': best_name, + 'is_match': is_match, + 'det_score': float(face.det_score), + 'quality_metrics': quality_metrics, + 'is_acceptable': is_acceptable + } + results.append(result) + + processing_time = (time.time() - start_time) * 1000 + return frame, results, processing_time + + def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: + """在帧上绘制检测结果和质量信息""" + bbox = result['bbox'] + similarity = result['similarity'] + is_match = result['is_match'] + is_acceptable = result['is_acceptable'] + quality_metrics = result['quality_metrics'] + best_match = result['best_match'] + + # 选择颜色 + if not is_acceptable: + color = (128, 128, 128) # 灰色 - 质量不可接受 + else: + # 选择颜色 - 根据名单模式 + if self.list_mode == "blacklist": + # 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色 + color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常 + else: # whitelist + # 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色 + color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人 + + # 绘制人脸框 + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + # 准备显示文本 + text_lines = [] + + # 第一行:匹配状态 + if not is_acceptable: + text_lines.append("LOW QUALITY") + else: + status = f"MATCH: {best_match}: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" + text_lines.append(status) + + # 第二行:质量得分 + text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") + + # 第三行:检测得分 + text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") + + # 第四行:清晰度 + text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}") + + # 第五行:姿态角度 + text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") + text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") + + # 计算文本区域大小 + max_text_width = 0 + total_text_height = 0 + line_heights = [] + + for line in text_lines: + (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) + max_text_width = max(max_text_width, text_width) + line_heights.append(text_height + baseline) + total_text_height += text_height + baseline + 2 + + # 绘制文本背景 + bg_x1 = x1 + bg_y1 = y1 - total_text_height - 10 + bg_x2 = x1 + max_text_width + 10 + bg_y2 = y1 + + # 如果背景超出图像顶部,调整到框下方 + if bg_y1 < 0: + bg_y1 = y2 + bg_y2 = y2 + total_text_height + 10 + + # 绘制半透明背景 + overlay = frame.copy() + cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) + alpha = 0.6 + cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) + + # 绘制文本 + current_y = bg_y1 + 15 + for i, line in enumerate(text_lines): + # 根据内容选择颜色 + if i == 0: # 状态行 + if not is_acceptable: + text_color = (128, 128, 128) # 灰色 - 质量差 + elif is_match: + text_color = (0, 255, 0) # 绿色 - 匹配 + else: + text_color = (0, 0, 255) # 红色 - 不匹配 + elif i == 3: # 清晰度行 + if quality_metrics['clarity_score'] >= self.clarity_threshold: + text_color = (255, 255, 255) + else: + text_color = (0, 0, 255) + elif i == 4: # pitch + if abs(quality_metrics['pitch']) > self.pitch_threshold: + text_color = (0, 0, 255) + else: + text_color = (255, 255, 255) + elif i == 5: # yaw + if abs(quality_metrics['yaw']) > self.yaw_threshold: + text_color = (0, 0, 255) + else: + text_color = (255, 255, 255) + else: + text_color = (255, 255, 255) # 白色 - 其他信息 + + cv2.putText(frame, line, (x1 + 5, current_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) + current_y += line_heights[i] + + return frame + + def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray: + """绘制所有检测结果""" + for result in results: + frame = self._draw_detection(frame, result) + return frame + + def extract_face_feature(self, image_path: str) -> Optional[np.ndarray]: + """ + 从单张图片中提取人脸特征值 + + 参数: + image_path: 人脸图片路径 + + 返回: + numpy数组格式的人脸特征值,如果检测失败返回None + """ + if not os.path.exists(image_path): + print(f"❌ 图片文件不存在: {image_path}") + return None + + # 读取图片 + img = cv2.imread(image_path) + if img is None: + print(f"❌ 无法读取图片: {image_path}") + return None + + # 人脸检测 + faces = self.app.get(img) + if not faces: + print(f"❌ 图片中未检测到人脸: {image_path}") + return None + + # 使用第一张检测到的人脸 + face = faces[0] + + # 检查人脸质量 + is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, img) + + if not is_acceptable: + print(f"⚠️ 人脸质量不可接受: {image_path}") + print(f" 质量得分: {quality_metrics['quality_score']:.3f}") + print(f" 清晰度: {quality_metrics['clarity_score']:.1f}") + print(f" 姿态角度: pitch={quality_metrics['pitch']:.1f}°, yaw={quality_metrics['yaw']:.1f}°") + return None + + print(f"✅ 成功提取人脸特征: {image_path}") + print(f" 检测得分: {quality_metrics['det_score']:.3f}") + print(f" 质量得分: {quality_metrics['quality_score']:.3f}") + + # 返回特征向量 + return face.embedding + + def extract_face_features_batch(self, image_paths: List[str]) -> Dict[str, Optional[np.ndarray]]: + """ + 批量从多张图片中提取人脸特征值 + + 参数: + image_paths: 人脸图片路径列表 + + 返回: + 字典格式 {图片路径: 特征值},失败的特征值为None + """ + results = {} + + for image_path in image_paths: + feature = self.extract_face_feature(image_path) + results[image_path] = feature + + # 统计结果 + success_count = sum(1 for feature in results.values() if feature is not None) + total_count = len(image_paths) + + print(f"🎉 批量提取完成: 成功 {success_count}/{total_count} 张图片") + + return results \ No newline at end of file diff --git a/src/face_recognition_algorithm.py b/src/face_recognition_algorithm.py index 54796f5..e335dd8 100644 --- a/src/face_recognition_algorithm.py +++ b/src/face_recognition_algorithm.py @@ -1,17 +1,16 @@ # face_recognition_algorithm.py import cv2 import numpy as np -import time from insightface.app import FaceAnalysis from typing import List, Dict, Tuple, Optional -import os -import glob + +from src.base_face_biz import BaseFaceBiz class FaceRecognitionAlgorithm: """ 人脸识别核心算法类 - 包含人脸检测、识别、质量评估等核心算法 + 负责模型加载和设备配置,与业务逻辑分离 """ def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True, use_npu: bool = False, @@ -39,18 +38,9 @@ class FaceRecognitionAlgorithm: ) } - # 质量阈值设置 + # 算法参数设置 self.det_size = 640 # 320快速 640中等 1280慢 - - # 默认配置 - blacklist - self.list_mode = "blacklist" # "blacklist" 或 "whitelist" self.det_threshold = 0.5 # 人脸置信度 - self.clarity_threshold = 100.0 # 清晰度阈值,低于此值认为人脸模糊 - self.min_face_size = 20 # 最小人脸像素尺寸 - self.pitch_threshold = 90 # - self.yaw_threshold = 90 # - self.quality_threshold = 0.6 # 质量得分阈值 - self.similarity_threshold = 0.3 # 根据设备类型选择配置 if use_npu: @@ -73,488 +63,27 @@ class FaceRecognitionAlgorithm: det_size=(self.det_size, self.det_size) ) - # 名单相关变量 - self.registered_faces = {} # {name: embedding} - print(f"✅ 人脸识别算法初始化完成 - 设备: {device_type.upper()}") - def set_list_mode(self, mode: str): - """设置名单模式""" - if mode.lower() in ["blacklist", "whitelist"]: - self.list_mode = mode.lower() - print(f"✅ 名单模式设置为: {self.list_mode}") - else: - print("❌ 无效的名单模式,请使用 'blacklist' 或 'whitelist'") - def set_det_threshold(self, threshold: float): """设置检测阈值""" self.det_threshold = threshold print(f"✅ 检测阈值设置为: {threshold}") - def set_similarity_threshold(self, threshold: float): - """设置相似度阈值""" - self.similarity_threshold = threshold - print(f"✅ 相似度阈值设置为: {threshold}") - - def set_pitch_threshold(self, threshold: float): - """设置俯仰角阈值""" - self.pitch_threshold = threshold - print(f"✅ 俯仰角阈值设置为: {threshold}") - - def set_yaw_threshold(self, threshold: float): - """设置偏航角阈值""" - self.yaw_threshold = threshold - print(f"✅ 偏航角阈值设置为: {threshold}") - def set_det_size(self, size: int): """设置检测尺寸""" self.det_size = size print(f"✅ 检测尺寸设置为: {size}") - def set_registered_faces(self, registered_faces: Dict[str, np.ndarray]): + def create_business(self) -> BaseFaceBiz: """ - 直接设置已注册的人脸数据 - 参数: registered_faces - 字典格式 {name: embedding} - """ - if not isinstance(registered_faces, dict): - print("❌ 参数必须是字典格式 {name: embedding}") - return False + 创建业务实例 - # 验证数据格式 - valid_count = 0 - for name, embedding in registered_faces.items(): - if isinstance(embedding, np.ndarray) and embedding.size > 0: - valid_count += 1 - - if valid_count == 0: - print("❌ 未找到有效的人脸嵌入数据") - return False - - self.registered_faces = registered_faces - print(f"✅ 成功设置 {valid_count} 个注册人脸") - return True - - def load_registered_faces(self, register_dir: str): - """ - 从目录加载注册的人脸图片 - 文件名(去掉后缀)即为人的名字 - """ - if not os.path.exists(register_dir): - print(f"❌ 注册目录不存在: {register_dir}") - return False - - # 支持的图片格式 - image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp'] - image_files = [] - - for ext in image_extensions: - image_files.extend(glob.glob(os.path.join(register_dir, ext))) - image_files.extend(glob.glob(os.path.join(register_dir, ext.upper()))) - - if not image_files: - print(f"❌ 在目录 {register_dir} 中未找到图片文件") - return False - - loaded_count = 0 - for image_path in image_files: - # 获取文件名(不含扩展名)作为人名 - person_name = os.path.splitext(os.path.basename(image_path))[0] - - # 读取图片并提取人脸特征 - img = cv2.imread(image_path) - if img is None: - print(f"❌ 无法读取图片: {image_path}") - continue - - faces = self.app.get(img) - if not faces: - print(f"❌ 图片中未检测到人脸: {image_path}") - continue - - # 使用第一张检测到的人脸 - self.registered_faces[person_name] = faces[0].embedding - loaded_count += 1 - print(f"✅ 加载注册人脸: {person_name}") - - print(f"🎉 成功加载 {loaded_count} 张注册人脸") - return loaded_count > 0 - - def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]: - """ - 在注册人脸中查找最佳匹配 - 返回: (匹配的人名, 相似度) - """ - if not self.registered_faces: - return None, 0.0 - - best_similarity = 0.0 - best_name = None - - # 归一化查询嵌入 - query_emb = embedding / np.linalg.norm(embedding) - - for name, registered_embedding in self.registered_faces.items(): - # 归一化注册嵌入 - reg_emb = registered_embedding / np.linalg.norm(registered_embedding) - - # 计算余弦相似度 - similarity = float(np.dot(query_emb, reg_emb)) - - if similarity > best_similarity: - best_similarity = similarity - best_name = name - - return best_name, best_similarity - - def calculate_clarity(self, face_region: np.ndarray) -> float: - """ - 计算人脸区域的清晰度/模糊度 - 使用拉普拉斯方差方法:值越高表示图像越清晰 - """ - if len(face_region.shape) == 3: - gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY) - else: - gray = face_region - - # 计算拉普拉斯算子的方差 - laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() - return laplacian_var - - def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]: - """ - 综合判断人脸质量是否可接受 - 返回: (是否可接受, 质量指标字典) - """ - quality_metrics = {} - - is_acceptable = True - - # 1. 检测置信度 - quality_metrics['det_score'] = float(face.det_score) - - # 2. 人脸姿态角度 - if hasattr(face, 'pose') and face.pose is not None: - pitch, yaw, roll = face.pose - quality_metrics['pitch'] = float(pitch) - quality_metrics['yaw'] = float(yaw) - quality_metrics['roll'] = float(roll) - else: - quality_metrics['pitch'] = 100.0 - quality_metrics['yaw'] = 100.0 - quality_metrics['roll'] = 100.0 - - # 3. 人脸边界框信息 - bbox = face.bbox - x1, y1, x2, y2 = bbox.astype(int) - width = x2 - x1 - height = y2 - y1 - quality_metrics['bbox_width'] = width - quality_metrics['bbox_height'] = height - quality_metrics['bbox_area'] = width * height - quality_metrics['aspect_ratio'] = width / height if height > 0 else 0 - - # 4. 图像清晰度检测 - # 提取人脸区域 - h, w = frame.shape[:2] - x1_clip = max(0, x1) - y1_clip = max(0, y1) - x2_clip = min(w, x2) - y2_clip = min(h, y2) - - if x2_clip > x1_clip and y2_clip > y1_clip: - face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip] - clarity_score = self.calculate_clarity(face_region) - quality_metrics['clarity_score'] = clarity_score - else: - quality_metrics['clarity_score'] = 0.0 - - # 5. 综合质量评分 - base_score = quality_metrics['det_score'] - - # 清晰度惩罚 - clarity_penalty = 0.0 - if quality_metrics['clarity_score'] < self.clarity_threshold: - clarity_penalty = 0.3 # 清晰度不足严重惩罚 - is_acceptable = False - - # 姿态惩罚 - pose_penalty = 0.0 - if abs(quality_metrics['yaw']) > self.yaw_threshold: - pose_penalty += 0.2 - is_acceptable = False - if abs(quality_metrics['pitch']) > self.pitch_threshold: - pose_penalty += 0.2 - is_acceptable = False - - # 尺寸惩罚 - size_penalty = 0.0 - if min(width, height) < self.min_face_size: - is_acceptable = False - size_penalty = 0.2 - - quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty) - - return is_acceptable, quality_metrics - - def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: - """ - 处理单帧图像 - 返回: (原始帧, 识别结果列表) - """ - start_time = time.time() - - # 人脸检测和识别 - faces = self.app.get(frame) - - results = [] - for face in faces: - # 检查人脸质量是否可接受 - is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame) - - # 查找最佳匹配 - best_name, similarity = self.find_best_match(face.embedding) - - # 根据名单模式判断是否匹配 - if self.list_mode == "blacklist": - # 黑名单模式:在黑名单中即为匹配(需要关注) - is_match = best_name is not None and similarity >= self.similarity_threshold - else: # whitelist - # 白名单模式:在白名单中即为匹配(允许通过) - is_match = best_name is not None and similarity >= self.similarity_threshold - - result = { - 'bbox': face.bbox.astype(int).tolist(), - 'similarity': similarity, - 'best_match': best_name, - 'is_match': is_match, - 'det_score': float(face.det_score), - 'quality_metrics': quality_metrics, - 'is_acceptable': is_acceptable # 新增:是否可接受标志 - } - results.append(result) - - processing_time = (time.time() - start_time) * 1000 - return frame, results, processing_time - - def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray: - """在帧上绘制检测结果和质量信息""" - bbox = result['bbox'] - similarity = result['similarity'] - is_match = result['is_match'] - is_acceptable = result['is_acceptable'] - quality_metrics = result['quality_metrics'] - best_match = result['best_match'] - - # 选择颜色 - if not is_acceptable: - color = (128, 128, 128) # 灰色 - 质量不可接受 - else: - # 选择颜色 - 根据名单模式 - if self.list_mode == "blacklist": - # 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色 - color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常 - else: # whitelist - # 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色 - color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人 - - # 绘制人脸框 - x1, y1, x2, y2 = bbox - cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) - - # 准备显示文本 - 只保留关键信息 - text_lines = [] - - # 第一行:匹配状态 - if not is_acceptable: - text_lines.append("LOW QUALITY") - else: - status = f"MATCH: {best_match}: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}" - text_lines.append(status) - - # 第二行:质量得分(根据阈值显示颜色) - text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}") - - # 第三行:检测得分 - text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}") - - # 第四行:清晰度 - text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}") - - # 第五行:姿态角度 - text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°") - text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°") - # text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°") - - text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}") - text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}") - - # 计算文本区域大小 - max_text_width = 0 - total_text_height = 0 - line_heights = [] - - for line in text_lines: - (text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) - max_text_width = max(max_text_width, text_width) - line_heights.append(text_height + baseline) - total_text_height += text_height + baseline + 2 - - # 绘制文本背景 - bg_x1 = x1 - bg_y1 = y1 - total_text_height - 10 - bg_x2 = x1 + max_text_width + 10 - bg_y2 = y1 - - # 如果背景超出图像顶部,调整到框下方 - if bg_y1 < 0: - bg_y1 = y2 - bg_y2 = y2 + total_text_height + 10 - - # 绘制半透明背景 - overlay = frame.copy() - cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1) - alpha = 0.6 - cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) - - # 绘制文本 - current_y = bg_y1 + 15 - for i, line in enumerate(text_lines): - # 根据内容选择颜色 - 按照你的要求简化颜色规则 - if i == 0: # 状态行 - if not is_acceptable: - text_color = (128, 128, 128) # 灰色 - 质量差 - elif is_match: - text_color = (0, 255, 0) # 绿色 - 匹配 - else: - text_color = (0, 0, 255) # 红色 - 不匹配 - elif i == 3: # 清晰度行 - # 清晰度:低于阈值红色,大于等于阈值绿色 - if quality_metrics['clarity_score'] >= self.clarity_threshold: - text_color = (255, 255, 255) - else: - text_color = (0, 0, 255) - elif i == 4: # pitch - if abs(quality_metrics['pitch']) > self.pitch_threshold: - text_color = (0, 0, 255) # 红色 - else: - text_color = (255, 255, 255) - elif i == 5: # yaw - if abs(quality_metrics['yaw']) > self.yaw_threshold: - text_color = (0, 0, 255) # 红色 - else: - text_color = (255, 255, 255) - elif i == 6: # 宽度 - if quality_metrics['bbox_width'] < self.min_face_size: - text_color = (0, 0, 255) # 红色 - else: - text_color = (255, 255, 255) - elif i == 7: # 高度 - if quality_metrics['bbox_height'] < self.min_face_size: - text_color = (0, 0, 255) # 红色 - else: - text_color = (255, 255, 255) - else: - text_color = (255, 255, 255) # 白色 - 其他信息 - - cv2.putText(frame, line, (x1 + 5, current_y), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1) - current_y += line_heights[i] - - return frame - - def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray: - """绘制所有检测结果""" - for result in results: - frame = self._draw_detection(frame, result) - return frame - - def set_quality_thresholds(self, clarity_threshold: float = None, - quality_threshold: float = None, - min_face_size: int = None): - """设置质量阈值""" - if clarity_threshold is not None: - self.clarity_threshold = clarity_threshold - if quality_threshold is not None: - self.quality_threshold = quality_threshold - if min_face_size is not None: - self.min_face_size = min_face_size - print( - f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}") - - def get_registered_face_count(self) -> int: - """获取已注册人脸数量""" - return len(self.registered_faces) - - def get_list_mode(self) -> str: - """获取当前名单模式""" - return self.list_mode - - def extract_face_feature(self, image_path: str) -> Optional[np.ndarray]: - """ - 从单张图片中提取人脸特征值 - - 参数: - image_path: 人脸图片路径 - 返回: - numpy数组格式的人脸特征值,如果检测失败返回None + BaseFaceBiz实例,用于处理具体的业务逻辑 """ - if not os.path.exists(image_path): - print(f"❌ 图片文件不存在: {image_path}") - return None - - # 读取图片 - img = cv2.imread(image_path) - if img is None: - print(f"❌ 无法读取图片: {image_path}") - return None - - # 人脸检测 - faces = self.app.get(img) - if not faces: - print(f"❌ 图片中未检测到人脸: {image_path}") - return None - - # 使用第一张检测到的人脸 - face = faces[0] - - # 检查人脸质量 - is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, img) - - if not is_acceptable: - print(f"⚠️ 人脸质量不可接受: {image_path}") - print(f" 质量得分: {quality_metrics['quality_score']:.3f}") - print(f" 清晰度: {quality_metrics['clarity_score']:.1f}") - print(f" 姿态角度: pitch={quality_metrics['pitch']:.1f}°, yaw={quality_metrics['yaw']:.1f}°") - return None - - print(f"✅ 成功提取人脸特征: {image_path}") - print(f" 检测得分: {quality_metrics['det_score']:.3f}") - print(f" 质量得分: {quality_metrics['quality_score']:.3f}") - - # 返回特征向量 - return face.embedding + return BaseFaceBiz(self.app) - def extract_face_features_batch(self, image_paths: List[str]) -> Dict[str, Optional[np.ndarray]]: - """ - 批量从多张图片中提取人脸特征值 - - 参数: - image_paths: 人脸图片路径列表 - - 返回: - 字典格式 {图片路径: 特征值},失败的特征值为None - """ - results = {} - - for image_path in image_paths: - feature = self.extract_face_feature(image_path) - results[image_path] = feature - - # 统计结果 - success_count = sum(1 for feature in results.values() if feature is not None) - total_count = len(image_paths) - - print(f"🎉 批量提取完成: 成功 {success_count}/{total_count} 张图片") - - return results \ No newline at end of file + def get_app(self) -> FaceAnalysis: + """获取底层的FaceAnalysis实例""" + return self.app \ No newline at end of file