将算法独立出来
This commit is contained in:
467
src/face_recognition_algorithm.py
Normal file
467
src/face_recognition_algorithm.py
Normal file
@@ -0,0 +1,467 @@
|
|||||||
|
# face_recognition_algorithm.py
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
from insightface.app import FaceAnalysis
|
||||||
|
from typing import List, Dict, Tuple, Optional
|
||||||
|
import os
|
||||||
|
import glob
|
||||||
|
|
||||||
|
|
||||||
|
class FaceRecognitionAlgorithm:
|
||||||
|
"""
|
||||||
|
人脸识别核心算法类
|
||||||
|
包含人脸检测、识别、质量评估等核心算法
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True, use_npu: bool = False,
|
||||||
|
npu_device_id: int = 0):
|
||||||
|
# 设备配置映射(NPU采用华为指定的完整参数)
|
||||||
|
self.DEVICE_CONFIG = {
|
||||||
|
"cpu": (['CPUExecutionProvider'], -1),
|
||||||
|
"gpu": (['CUDAExecutionProvider'], 0),
|
||||||
|
"npu": (
|
||||||
|
[
|
||||||
|
(
|
||||||
|
"CANNExecutionProvider",
|
||||||
|
{
|
||||||
|
"device_id": npu_device_id,
|
||||||
|
"arena_extend_strategy": "kNextPowerOfTwo",
|
||||||
|
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
|
||||||
|
"op_select_impl_mode": "high_precision",
|
||||||
|
"precision_mode": "allow_fp32_to_fp16",
|
||||||
|
"enable_cann_graph": True,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
"CPUExecutionProvider"
|
||||||
|
],
|
||||||
|
npu_device_id
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
# 质量阈值设置
|
||||||
|
self.det_size = 640 # 320快速 640中等 1280慢
|
||||||
|
|
||||||
|
# 默认配置 - blacklist
|
||||||
|
self.list_mode = "blacklist" # "blacklist" 或 "whitelist"
|
||||||
|
self.det_threshold = 0.5 # 人脸置信度
|
||||||
|
self.clarity_threshold = 100.0 # 清晰度阈值,低于此值认为人脸模糊
|
||||||
|
self.min_face_size = 20 # 最小人脸像素尺寸
|
||||||
|
self.pitch_threshold = 90 #
|
||||||
|
self.yaw_threshold = 90 #
|
||||||
|
self.quality_threshold = 0.6 # 质量得分阈值
|
||||||
|
self.similarity_threshold = 0.3
|
||||||
|
|
||||||
|
# 根据设备类型选择配置
|
||||||
|
if use_npu:
|
||||||
|
device_type = "npu"
|
||||||
|
print(f"✅ 使用NPU设备,设备ID: {npu_device_id}")
|
||||||
|
elif use_gpu:
|
||||||
|
device_type = "gpu"
|
||||||
|
print("✅ 使用GPU设备")
|
||||||
|
else:
|
||||||
|
device_type = "cpu"
|
||||||
|
print("✅ 使用CPU设备")
|
||||||
|
|
||||||
|
providers, ctx_id = self.DEVICE_CONFIG[device_type]
|
||||||
|
|
||||||
|
# 初始化人脸识别模型
|
||||||
|
self.app = FaceAnalysis(name=model_name, providers=providers)
|
||||||
|
self.app.prepare(
|
||||||
|
ctx_id=ctx_id,
|
||||||
|
det_thresh=self.det_threshold,
|
||||||
|
det_size=(self.det_size, self.det_size)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 名单相关变量
|
||||||
|
self.registered_faces = {} # {name: embedding}
|
||||||
|
|
||||||
|
print(f"✅ 人脸识别算法初始化完成 - 设备: {device_type.upper()}")
|
||||||
|
|
||||||
|
def set_list_mode(self, mode: str):
|
||||||
|
"""设置名单模式"""
|
||||||
|
if mode.lower() in ["blacklist", "whitelist"]:
|
||||||
|
self.list_mode = mode.lower()
|
||||||
|
print(f"✅ 名单模式设置为: {self.list_mode}")
|
||||||
|
else:
|
||||||
|
print("❌ 无效的名单模式,请使用 'blacklist' 或 'whitelist'")
|
||||||
|
|
||||||
|
def set_det_threshold(self, threshold: float):
|
||||||
|
"""设置检测阈值"""
|
||||||
|
self.det_threshold = threshold
|
||||||
|
print(f"✅ 检测阈值设置为: {threshold}")
|
||||||
|
|
||||||
|
def set_similarity_threshold(self, threshold: float):
|
||||||
|
"""设置相似度阈值"""
|
||||||
|
self.similarity_threshold = threshold
|
||||||
|
print(f"✅ 相似度阈值设置为: {threshold}")
|
||||||
|
|
||||||
|
def set_pitch_threshold(self, threshold: float):
|
||||||
|
"""设置俯仰角阈值"""
|
||||||
|
self.pitch_threshold = threshold
|
||||||
|
print(f"✅ 俯仰角阈值设置为: {threshold}")
|
||||||
|
|
||||||
|
def set_yaw_threshold(self, threshold: float):
|
||||||
|
"""设置偏航角阈值"""
|
||||||
|
self.yaw_threshold = threshold
|
||||||
|
print(f"✅ 偏航角阈值设置为: {threshold}")
|
||||||
|
|
||||||
|
def set_det_size(self, size: int):
|
||||||
|
"""设置检测尺寸"""
|
||||||
|
self.det_size = size
|
||||||
|
print(f"✅ 检测尺寸设置为: {size}")
|
||||||
|
|
||||||
|
def load_registered_faces(self, register_dir: str):
|
||||||
|
"""
|
||||||
|
从目录加载注册的人脸图片
|
||||||
|
文件名(去掉后缀)即为人的名字
|
||||||
|
"""
|
||||||
|
if not os.path.exists(register_dir):
|
||||||
|
print(f"❌ 注册目录不存在: {register_dir}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 支持的图片格式
|
||||||
|
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp']
|
||||||
|
image_files = []
|
||||||
|
|
||||||
|
for ext in image_extensions:
|
||||||
|
image_files.extend(glob.glob(os.path.join(register_dir, ext)))
|
||||||
|
image_files.extend(glob.glob(os.path.join(register_dir, ext.upper())))
|
||||||
|
|
||||||
|
if not image_files:
|
||||||
|
print(f"❌ 在目录 {register_dir} 中未找到图片文件")
|
||||||
|
return False
|
||||||
|
|
||||||
|
loaded_count = 0
|
||||||
|
for image_path in image_files:
|
||||||
|
# 获取文件名(不含扩展名)作为人名
|
||||||
|
person_name = os.path.splitext(os.path.basename(image_path))[0]
|
||||||
|
|
||||||
|
# 读取图片并提取人脸特征
|
||||||
|
img = cv2.imread(image_path)
|
||||||
|
if img is None:
|
||||||
|
print(f"❌ 无法读取图片: {image_path}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
faces = self.app.get(img)
|
||||||
|
if not faces:
|
||||||
|
print(f"❌ 图片中未检测到人脸: {image_path}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 使用第一张检测到的人脸
|
||||||
|
self.registered_faces[person_name] = faces[0].embedding
|
||||||
|
loaded_count += 1
|
||||||
|
print(f"✅ 加载注册人脸: {person_name}")
|
||||||
|
|
||||||
|
print(f"🎉 成功加载 {loaded_count} 张注册人脸")
|
||||||
|
return loaded_count > 0
|
||||||
|
|
||||||
|
def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]:
|
||||||
|
"""
|
||||||
|
在注册人脸中查找最佳匹配
|
||||||
|
返回: (匹配的人名, 相似度)
|
||||||
|
"""
|
||||||
|
if not self.registered_faces:
|
||||||
|
return None, 0.0
|
||||||
|
|
||||||
|
best_similarity = 0.0
|
||||||
|
best_name = None
|
||||||
|
|
||||||
|
# 归一化查询嵌入
|
||||||
|
query_emb = embedding / np.linalg.norm(embedding)
|
||||||
|
|
||||||
|
for name, registered_embedding in self.registered_faces.items():
|
||||||
|
# 归一化注册嵌入
|
||||||
|
reg_emb = registered_embedding / np.linalg.norm(registered_embedding)
|
||||||
|
|
||||||
|
# 计算余弦相似度
|
||||||
|
similarity = float(np.dot(query_emb, reg_emb))
|
||||||
|
|
||||||
|
if similarity > best_similarity:
|
||||||
|
best_similarity = similarity
|
||||||
|
best_name = name
|
||||||
|
|
||||||
|
return best_name, best_similarity
|
||||||
|
|
||||||
|
def calculate_clarity(self, face_region: np.ndarray) -> float:
|
||||||
|
"""
|
||||||
|
计算人脸区域的清晰度/模糊度
|
||||||
|
使用拉普拉斯方差方法:值越高表示图像越清晰
|
||||||
|
"""
|
||||||
|
if len(face_region.shape) == 3:
|
||||||
|
gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY)
|
||||||
|
else:
|
||||||
|
gray = face_region
|
||||||
|
|
||||||
|
# 计算拉普拉斯算子的方差
|
||||||
|
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
||||||
|
return laplacian_var
|
||||||
|
|
||||||
|
def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]:
|
||||||
|
"""
|
||||||
|
综合判断人脸质量是否可接受
|
||||||
|
返回: (是否可接受, 质量指标字典)
|
||||||
|
"""
|
||||||
|
quality_metrics = {}
|
||||||
|
|
||||||
|
is_acceptable = True
|
||||||
|
|
||||||
|
# 1. 检测置信度
|
||||||
|
quality_metrics['det_score'] = float(face.det_score)
|
||||||
|
|
||||||
|
# 2. 人脸姿态角度
|
||||||
|
if hasattr(face, 'pose') and face.pose is not None:
|
||||||
|
pitch, yaw, roll = face.pose
|
||||||
|
quality_metrics['pitch'] = float(pitch)
|
||||||
|
quality_metrics['yaw'] = float(yaw)
|
||||||
|
quality_metrics['roll'] = float(roll)
|
||||||
|
else:
|
||||||
|
quality_metrics['pitch'] = 100.0
|
||||||
|
quality_metrics['yaw'] = 100.0
|
||||||
|
quality_metrics['roll'] = 100.0
|
||||||
|
|
||||||
|
# 3. 人脸边界框信息
|
||||||
|
bbox = face.bbox
|
||||||
|
x1, y1, x2, y2 = bbox.astype(int)
|
||||||
|
width = x2 - x1
|
||||||
|
height = y2 - y1
|
||||||
|
quality_metrics['bbox_width'] = width
|
||||||
|
quality_metrics['bbox_height'] = height
|
||||||
|
quality_metrics['bbox_area'] = width * height
|
||||||
|
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
|
||||||
|
|
||||||
|
# 4. 图像清晰度检测
|
||||||
|
# 提取人脸区域
|
||||||
|
h, w = frame.shape[:2]
|
||||||
|
x1_clip = max(0, x1)
|
||||||
|
y1_clip = max(0, y1)
|
||||||
|
x2_clip = min(w, x2)
|
||||||
|
y2_clip = min(h, y2)
|
||||||
|
|
||||||
|
if x2_clip > x1_clip and y2_clip > y1_clip:
|
||||||
|
face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip]
|
||||||
|
clarity_score = self.calculate_clarity(face_region)
|
||||||
|
quality_metrics['clarity_score'] = clarity_score
|
||||||
|
else:
|
||||||
|
quality_metrics['clarity_score'] = 0.0
|
||||||
|
|
||||||
|
# 5. 综合质量评分
|
||||||
|
base_score = quality_metrics['det_score']
|
||||||
|
|
||||||
|
# 清晰度惩罚
|
||||||
|
clarity_penalty = 0.0
|
||||||
|
if quality_metrics['clarity_score'] < self.clarity_threshold:
|
||||||
|
clarity_penalty = 0.3 # 清晰度不足严重惩罚
|
||||||
|
is_acceptable = False
|
||||||
|
|
||||||
|
# 姿态惩罚
|
||||||
|
pose_penalty = 0.0
|
||||||
|
if abs(quality_metrics['yaw']) > self.yaw_threshold:
|
||||||
|
pose_penalty += 0.2
|
||||||
|
is_acceptable = False
|
||||||
|
if abs(quality_metrics['pitch']) > self.pitch_threshold:
|
||||||
|
pose_penalty += 0.2
|
||||||
|
is_acceptable = False
|
||||||
|
|
||||||
|
# 尺寸惩罚
|
||||||
|
size_penalty = 0.0
|
||||||
|
if min(width, height) < self.min_face_size:
|
||||||
|
is_acceptable = False
|
||||||
|
size_penalty = 0.2
|
||||||
|
|
||||||
|
quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty)
|
||||||
|
|
||||||
|
return is_acceptable, quality_metrics
|
||||||
|
|
||||||
|
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
|
||||||
|
"""
|
||||||
|
处理单帧图像
|
||||||
|
返回: (原始帧, 识别结果列表)
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
# 人脸检测和识别
|
||||||
|
faces = self.app.get(frame)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for face in faces:
|
||||||
|
# 检查人脸质量是否可接受
|
||||||
|
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
|
||||||
|
|
||||||
|
# 查找最佳匹配
|
||||||
|
best_name, similarity = self.find_best_match(face.embedding)
|
||||||
|
|
||||||
|
# 根据名单模式判断是否匹配
|
||||||
|
if self.list_mode == "blacklist":
|
||||||
|
# 黑名单模式:在黑名单中即为匹配(需要关注)
|
||||||
|
is_match = best_name is not None and similarity >= self.similarity_threshold
|
||||||
|
else: # whitelist
|
||||||
|
# 白名单模式:在白名单中即为匹配(允许通过)
|
||||||
|
is_match = best_name is not None and similarity >= self.similarity_threshold
|
||||||
|
|
||||||
|
result = {
|
||||||
|
'bbox': face.bbox.astype(int).tolist(),
|
||||||
|
'similarity': similarity,
|
||||||
|
'best_match': best_name,
|
||||||
|
'is_match': is_match,
|
||||||
|
'det_score': float(face.det_score),
|
||||||
|
'quality_metrics': quality_metrics,
|
||||||
|
'is_acceptable': is_acceptable # 新增:是否可接受标志
|
||||||
|
}
|
||||||
|
results.append(result)
|
||||||
|
|
||||||
|
processing_time = (time.time() - start_time) * 1000
|
||||||
|
return frame, results, processing_time
|
||||||
|
|
||||||
|
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
|
||||||
|
"""在帧上绘制检测结果和质量信息"""
|
||||||
|
bbox = result['bbox']
|
||||||
|
similarity = result['similarity']
|
||||||
|
is_match = result['is_match']
|
||||||
|
is_acceptable = result['is_acceptable']
|
||||||
|
quality_metrics = result['quality_metrics']
|
||||||
|
best_match = result['best_match']
|
||||||
|
|
||||||
|
# 选择颜色
|
||||||
|
if not is_acceptable:
|
||||||
|
color = (128, 128, 128) # 灰色 - 质量不可接受
|
||||||
|
else:
|
||||||
|
# 选择颜色 - 根据名单模式
|
||||||
|
if self.list_mode == "blacklist":
|
||||||
|
# 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色
|
||||||
|
color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常
|
||||||
|
else: # whitelist
|
||||||
|
# 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色
|
||||||
|
color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人
|
||||||
|
|
||||||
|
# 绘制人脸框
|
||||||
|
x1, y1, x2, y2 = bbox
|
||||||
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
||||||
|
|
||||||
|
# 准备显示文本 - 只保留关键信息
|
||||||
|
text_lines = []
|
||||||
|
|
||||||
|
# 第一行:匹配状态
|
||||||
|
if not is_acceptable:
|
||||||
|
text_lines.append("LOW QUALITY")
|
||||||
|
else:
|
||||||
|
status = f"MATCH: {best_match}: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
|
||||||
|
text_lines.append(status)
|
||||||
|
|
||||||
|
# 第二行:质量得分(根据阈值显示颜色)
|
||||||
|
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
|
||||||
|
|
||||||
|
# 第三行:检测得分
|
||||||
|
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
|
||||||
|
|
||||||
|
# 第四行:清晰度
|
||||||
|
text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}")
|
||||||
|
|
||||||
|
# 第五行:姿态角度
|
||||||
|
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
|
||||||
|
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
|
||||||
|
# text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
|
||||||
|
|
||||||
|
text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}")
|
||||||
|
text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}")
|
||||||
|
|
||||||
|
# 计算文本区域大小
|
||||||
|
max_text_width = 0
|
||||||
|
total_text_height = 0
|
||||||
|
line_heights = []
|
||||||
|
|
||||||
|
for line in text_lines:
|
||||||
|
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
|
||||||
|
max_text_width = max(max_text_width, text_width)
|
||||||
|
line_heights.append(text_height + baseline)
|
||||||
|
total_text_height += text_height + baseline + 2
|
||||||
|
|
||||||
|
# 绘制文本背景
|
||||||
|
bg_x1 = x1
|
||||||
|
bg_y1 = y1 - total_text_height - 10
|
||||||
|
bg_x2 = x1 + max_text_width + 10
|
||||||
|
bg_y2 = y1
|
||||||
|
|
||||||
|
# 如果背景超出图像顶部,调整到框下方
|
||||||
|
if bg_y1 < 0:
|
||||||
|
bg_y1 = y2
|
||||||
|
bg_y2 = y2 + total_text_height + 10
|
||||||
|
|
||||||
|
# 绘制半透明背景
|
||||||
|
overlay = frame.copy()
|
||||||
|
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
|
||||||
|
alpha = 0.6
|
||||||
|
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
|
||||||
|
|
||||||
|
# 绘制文本
|
||||||
|
current_y = bg_y1 + 15
|
||||||
|
for i, line in enumerate(text_lines):
|
||||||
|
# 根据内容选择颜色 - 按照你的要求简化颜色规则
|
||||||
|
if i == 0: # 状态行
|
||||||
|
if not is_acceptable:
|
||||||
|
text_color = (128, 128, 128) # 灰色 - 质量差
|
||||||
|
elif is_match:
|
||||||
|
text_color = (0, 255, 0) # 绿色 - 匹配
|
||||||
|
else:
|
||||||
|
text_color = (0, 0, 255) # 红色 - 不匹配
|
||||||
|
elif i == 3: # 清晰度行
|
||||||
|
# 清晰度:低于阈值红色,大于等于阈值绿色
|
||||||
|
if quality_metrics['clarity_score'] >= self.clarity_threshold:
|
||||||
|
text_color = (255, 255, 255)
|
||||||
|
else:
|
||||||
|
text_color = (0, 0, 255)
|
||||||
|
elif i == 4: # pitch
|
||||||
|
if abs(quality_metrics['pitch']) > self.pitch_threshold:
|
||||||
|
text_color = (0, 0, 255) # 红色
|
||||||
|
else:
|
||||||
|
text_color = (255, 255, 255)
|
||||||
|
elif i == 5: # yaw
|
||||||
|
if abs(quality_metrics['yaw']) > self.yaw_threshold:
|
||||||
|
text_color = (0, 0, 255) # 红色
|
||||||
|
else:
|
||||||
|
text_color = (255, 255, 255)
|
||||||
|
elif i == 6: # 宽度
|
||||||
|
if quality_metrics['bbox_width'] < self.min_face_size:
|
||||||
|
text_color = (0, 0, 255) # 红色
|
||||||
|
else:
|
||||||
|
text_color = (255, 255, 255)
|
||||||
|
elif i == 7: # 高度
|
||||||
|
if quality_metrics['bbox_height'] < self.min_face_size:
|
||||||
|
text_color = (0, 0, 255) # 红色
|
||||||
|
else:
|
||||||
|
text_color = (255, 255, 255)
|
||||||
|
else:
|
||||||
|
text_color = (255, 255, 255) # 白色 - 其他信息
|
||||||
|
|
||||||
|
cv2.putText(frame, line, (x1 + 5, current_y),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
|
||||||
|
current_y += line_heights[i]
|
||||||
|
|
||||||
|
return frame
|
||||||
|
|
||||||
|
def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray:
|
||||||
|
"""绘制所有检测结果"""
|
||||||
|
for result in results:
|
||||||
|
frame = self._draw_detection(frame, result)
|
||||||
|
return frame
|
||||||
|
|
||||||
|
def set_quality_thresholds(self, clarity_threshold: float = None,
|
||||||
|
quality_threshold: float = None,
|
||||||
|
min_face_size: int = None):
|
||||||
|
"""设置质量阈值"""
|
||||||
|
if clarity_threshold is not None:
|
||||||
|
self.clarity_threshold = clarity_threshold
|
||||||
|
if quality_threshold is not None:
|
||||||
|
self.quality_threshold = quality_threshold
|
||||||
|
if min_face_size is not None:
|
||||||
|
self.min_face_size = min_face_size
|
||||||
|
print(
|
||||||
|
f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}")
|
||||||
|
|
||||||
|
def get_registered_face_count(self) -> int:
|
||||||
|
"""获取已注册人脸数量"""
|
||||||
|
return len(self.registered_faces)
|
||||||
|
|
||||||
|
def get_list_mode(self) -> str:
|
||||||
|
"""获取当前名单模式"""
|
||||||
|
return self.list_mode
|
||||||
275
src/video_face_recognition_cann_3.py
Normal file
275
src/video_face_recognition_cann_3.py
Normal file
@@ -0,0 +1,275 @@
|
|||||||
|
# video_face_recognition_cann.py
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
from face_recognition_algorithm import FaceRecognitionAlgorithm
|
||||||
|
|
||||||
|
|
||||||
|
def process_video_file(algorithm: FaceRecognitionAlgorithm, video_path: str, output_path: str = None,
|
||||||
|
skip_frames: int = 0, show_preview: bool = True):
|
||||||
|
"""
|
||||||
|
处理视频文件
|
||||||
|
|
||||||
|
Args:
|
||||||
|
algorithm: FaceRecognitionAlgorithm实例
|
||||||
|
video_path: 输入视频路径
|
||||||
|
output_path: 输出视频路径
|
||||||
|
skip_frames: 跳帧数,用于提高处理速度
|
||||||
|
show_preview: 是否显示实时预览
|
||||||
|
"""
|
||||||
|
if not os.path.exists(video_path):
|
||||||
|
print(f"❌ 视频文件不存在: {video_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 打开视频文件
|
||||||
|
cap = cv2.VideoCapture(video_path)
|
||||||
|
if not cap.isOpened():
|
||||||
|
print(f"❌ 无法打开视频文件: {video_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 获取视频信息
|
||||||
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||||
|
|
||||||
|
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
|
||||||
|
print(f"🎯 当前模式: {algorithm.get_list_mode()}, 注册人脸数: {algorithm.get_registered_face_count()}")
|
||||||
|
|
||||||
|
# 设置输出视频
|
||||||
|
if output_path:
|
||||||
|
# 确保输出目录存在
|
||||||
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||||
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||||||
|
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
|
||||||
|
else:
|
||||||
|
out = None
|
||||||
|
|
||||||
|
# 性能统计
|
||||||
|
frame_count = 0
|
||||||
|
processed_frames = 0
|
||||||
|
processing_times = []
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
print("🚀 开始处理视频...")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
break
|
||||||
|
|
||||||
|
# 跳帧处理
|
||||||
|
if skip_frames > 0 and frame_count % (skip_frames + 1) != 0:
|
||||||
|
frame_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 处理当前帧 - 获取结果
|
||||||
|
processed_frame, results, processing_time = algorithm.process_frame(frame)
|
||||||
|
print(f"face process time : {processing_time}")
|
||||||
|
|
||||||
|
# 在帧上绘制检测结果
|
||||||
|
processed_frame = algorithm.draw_detections(processed_frame, results)
|
||||||
|
|
||||||
|
# 记录处理时间
|
||||||
|
processing_times.append(processing_time)
|
||||||
|
|
||||||
|
# 写入输出视频
|
||||||
|
if out:
|
||||||
|
out.write(processed_frame)
|
||||||
|
|
||||||
|
# 显示预览
|
||||||
|
if show_preview:
|
||||||
|
# 添加性能信息
|
||||||
|
fps_text = f"Frame: {frame_count}/{total_frames} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}"
|
||||||
|
cv2.putText(processed_frame, fps_text, (10, 30),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||||||
|
|
||||||
|
# 添加名单统计
|
||||||
|
match_count = sum(1 for r in results if r['is_match'])
|
||||||
|
list_text = f"Match: {match_count}/{len(results)}"
|
||||||
|
cv2.putText(processed_frame, list_text, (10, 60),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||||||
|
|
||||||
|
cv2.imshow('Video Face Recognition', processed_frame)
|
||||||
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||||
|
break
|
||||||
|
|
||||||
|
frame_count += 1
|
||||||
|
processed_frames += 1
|
||||||
|
|
||||||
|
# 进度显示
|
||||||
|
if frame_count % 30 == 0:
|
||||||
|
progress = (frame_count / total_frames) * 100
|
||||||
|
print(f"📊 处理进度: {progress:.1f}% ({frame_count}/{total_frames})")
|
||||||
|
|
||||||
|
# 清理资源
|
||||||
|
cap.release()
|
||||||
|
if out:
|
||||||
|
out.release()
|
||||||
|
if show_preview:
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
# 性能统计
|
||||||
|
total_time = time.time() - start_time
|
||||||
|
avg_processing_time = np.mean(processing_times) if processing_times else 0
|
||||||
|
|
||||||
|
print(f"\n🎉 视频处理完成!")
|
||||||
|
print(f"📊 性能统计:")
|
||||||
|
print(f" 总处理帧数: {processed_frames}")
|
||||||
|
print(f" 总耗时: {total_time:.1f}秒")
|
||||||
|
print(f" 平均每帧: {avg_processing_time:.1f}ms")
|
||||||
|
print(f" 实际FPS: {processed_frames / total_time:.1f}")
|
||||||
|
if output_path:
|
||||||
|
print(f" 输出视频: {output_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def process_webcam(algorithm: FaceRecognitionAlgorithm, camera_id: int = 0, output_path: str = None):
|
||||||
|
"""
|
||||||
|
处理摄像头实时视频流
|
||||||
|
"""
|
||||||
|
cap = cv2.VideoCapture(camera_id)
|
||||||
|
if not cap.isOpened():
|
||||||
|
print(f"❌ 无法打开摄像头 {camera_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 设置摄像头分辨率(可选)
|
||||||
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
|
||||||
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
|
||||||
|
|
||||||
|
# 设置输出视频
|
||||||
|
if output_path:
|
||||||
|
# 确保输出目录存在
|
||||||
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||||
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||||||
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
||||||
|
else:
|
||||||
|
out = None
|
||||||
|
|
||||||
|
# 性能统计
|
||||||
|
processing_times = []
|
||||||
|
|
||||||
|
print(f"🎥 开始摄像头实时识别 - 模式: {algorithm.get_list_mode()} (按 'q' 退出)...")
|
||||||
|
print(f"📋 注册人脸数: {algorithm.get_registered_face_count()}")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
print("❌ 无法读取摄像头帧")
|
||||||
|
break
|
||||||
|
|
||||||
|
# 处理当前帧 - 获取结果
|
||||||
|
processed_frame, results, processing_time = algorithm.process_frame(frame)
|
||||||
|
print(f"face process time : {processing_time}")
|
||||||
|
# 在帧上绘制检测结果
|
||||||
|
processed_frame = algorithm.draw_detections(processed_frame, results)
|
||||||
|
|
||||||
|
# 记录处理时间
|
||||||
|
processing_times.append(processing_time)
|
||||||
|
|
||||||
|
# 添加实时信息
|
||||||
|
current_fps = 1000 / processing_times[-1] if processing_times else 0
|
||||||
|
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {algorithm.get_list_mode()}"
|
||||||
|
cv2.putText(processed_frame, info_text, (10, 30),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||||||
|
|
||||||
|
# 添加名单统计
|
||||||
|
match_count = sum(1 for r in results if r['is_match'])
|
||||||
|
list_text = f"Match: {match_count}/{len(results)}"
|
||||||
|
cv2.putText(processed_frame, list_text, (10, 60),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||||||
|
|
||||||
|
# 写入输出
|
||||||
|
if out:
|
||||||
|
out.write(processed_frame)
|
||||||
|
|
||||||
|
# 显示预览
|
||||||
|
cv2.imshow('Real-time Face Recognition', processed_frame)
|
||||||
|
|
||||||
|
# 按'q'退出
|
||||||
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||||
|
break
|
||||||
|
|
||||||
|
# 清理资源
|
||||||
|
cap.release()
|
||||||
|
if out:
|
||||||
|
out.release()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
print("✅ 摄像头处理结束")
|
||||||
|
|
||||||
|
|
||||||
|
# 使用示例
|
||||||
|
def main():
|
||||||
|
# 直接创建人脸识别算法实例
|
||||||
|
# 使用NPU:
|
||||||
|
# algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=True, npu_device_id=0)
|
||||||
|
# 使用GPU:
|
||||||
|
algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False)
|
||||||
|
# 使用CPU:
|
||||||
|
# algorithm = FaceRecognitionAlgorithm(use_gpu=False, use_npu=False)
|
||||||
|
# algorithm = FaceRecognitionAlgorithm(use_gpu=True, use_npu=False) # 默认使用GPU
|
||||||
|
|
||||||
|
# 设置名单模式
|
||||||
|
# algorithm.set_list_mode("blacklist") # 黑名单模式
|
||||||
|
# algorithm.set_list_mode("whitelist") # 白名单模式
|
||||||
|
|
||||||
|
# 加载注册人脸
|
||||||
|
register_dir = "test_data/register" # 注册图片目录
|
||||||
|
if os.path.exists(register_dir):
|
||||||
|
algorithm.load_registered_faces(register_dir)
|
||||||
|
else:
|
||||||
|
print(f"⚠️ 注册目录不存在: {register_dir}")
|
||||||
|
#
|
||||||
|
# # 设置质量阈值(可根据实际情况调整)
|
||||||
|
# algorithm.set_quality_thresholds(
|
||||||
|
# clarity_threshold=1000.0, # 清晰度阈值
|
||||||
|
# quality_threshold=0.6, # 质量得分阈值
|
||||||
|
# min_face_size=30
|
||||||
|
# )
|
||||||
|
|
||||||
|
# # 选择处理模式
|
||||||
|
# print("请选择处理模式:")
|
||||||
|
# print("1. 处理视频文件")
|
||||||
|
# print("2. 实时摄像头")
|
||||||
|
#
|
||||||
|
# choice = input("请输入选择 (1 或 2): ").strip()
|
||||||
|
|
||||||
|
choice = "1"
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
# 处理视频文件
|
||||||
|
video_path = "test_data/video/video_2.mp4"
|
||||||
|
output_path = "test_data/output_video/video_2_white_9_gpu.mp4"
|
||||||
|
# output_path = "test_data/output_video/video_2_black_2.mp4"
|
||||||
|
|
||||||
|
# 性能优化:跳帧处理
|
||||||
|
skip_frames = 2
|
||||||
|
|
||||||
|
process_video_file(
|
||||||
|
algorithm=algorithm,
|
||||||
|
video_path=video_path,
|
||||||
|
output_path=output_path,
|
||||||
|
skip_frames=skip_frames,
|
||||||
|
show_preview=False
|
||||||
|
)
|
||||||
|
|
||||||
|
elif choice == "2":
|
||||||
|
# 实时摄像头
|
||||||
|
output_path = "webcam_recording.mp4"
|
||||||
|
|
||||||
|
process_webcam(
|
||||||
|
algorithm=algorithm,
|
||||||
|
camera_id=0,
|
||||||
|
output_path=output_path
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("❌ 无效选择")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user