阶段性人脸识别成果,可以对视频做白名单、黑名单识别

This commit is contained in:
zqc
2025-11-26 16:09:09 +08:00
parent 3dfe075080
commit 80491f277b
7 changed files with 2713 additions and 11 deletions

View File

@@ -33,7 +33,7 @@ class SingleFaceComparisonSystem:
self.target_image_path = None
# 相似度阈值
self.similarity_threshold = 0.25
self.similarity_threshold = 0.3
# 颜色配置
self.colors = {
@@ -141,8 +141,8 @@ class SingleFaceComparisonSystem:
"""
if similarity >= self.similarity_threshold:
return self.colors['match_high'] # 高相似度 - 绿色
elif similarity >= self.similarity_threshold/2:
return self.colors['match_low'] # 低相似度 - 橙色
# elif similarity >= self.similarity_threshold/2:
# return self.colors['match_low'] # 低相似度 - 橙色
else:
return self.colors['no_match'] # 不匹配 - 红色
@@ -383,8 +383,9 @@ def demo_usage():
os.makedirs("test_data/query", exist_ok=True)
os.makedirs("test_data/output", exist_ok=True)
target_image = "test_data/register/person2.jpg"
# target_image = "test_data/register/person2.jpg"
# target_image = "test_data/register/person1.png"
target_image = "test_data/register/ztk.jpg"
# 设置目标人脸
@@ -431,8 +432,8 @@ def demo_usage():
print("\n=== 批量比对图像 ===")
if os.path.exists("test_data/query") and len(os.listdir("test_data/query")) > 0:
stats = face_system.batch_process_with_target(
input_dir="test_data/query",
output_dir="test_data/output/batch_comparison"
input_dir="test_data/query/2",
output_dir="test_data/output/batch_comparison/2"
)
print(f"批量比对统计:")
@@ -447,7 +448,7 @@ def demo_usage():
# 5. 调整相似度阈值
print("\n=== 调整阈值 ===")
face_system.set_similarity_threshold(0.25) # 提高阈值,更严格
face_system.set_similarity_threshold(0.3) # 提高阈值,更严格
# 高级使用示例

View File

@@ -294,8 +294,8 @@ def main():
video_system = VideoFaceRecognition(use_gpu=True)
# 设置目标人脸(可选)
target_image = "test_data/register/person1.png"
# target_image = "test_data/register/cjh.jpg"
# target_image = "test_data/register/person1.png"
target_image = "test_data/register/sy.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
@@ -310,8 +310,8 @@ def main():
if choice == "1":
# 处理视频文件
video_path = "test_data/video/test_video.mp4"
output_path = "test_data/output_video/zqc.mp4"
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_1.mp4"
# 性能优化:跳帧处理
skip_frames = 1 # 每2帧处理1帧提高速度

View File

@@ -0,0 +1,466 @@
# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple
import os
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=0.3,
det_size=(640, 640)
)
self.target_embedding = None
self.target_id = None
self.similarity_threshold = 0.3
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_target_face(self, image_path: str, person_id: str = "target") -> bool:
"""设置目标人脸"""
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取目标图像: {image_path}")
return False
faces = self.app.get(img)
if not faces:
print(f"❌ 目标图像中未检测到人脸: {image_path}")
return False
self.target_embedding = faces[0].embedding
self.target_id = person_id
print(f"✅ 目标人脸设置: {person_id}")
return True
def calculate_face_quality(self, face) -> Dict:
"""
计算人脸质量指标
"""
quality_metrics = {}
# 1. 检测置信度
quality_metrics['det_score'] = float(face.det_score)
# 2. 人脸姿态角度 (pitch, yaw, roll)
if hasattr(face, 'pose') and face.pose is not None:
pitch, yaw, roll = face.pose
quality_metrics['pitch'] = float(pitch) # 俯仰角
quality_metrics['yaw'] = float(yaw) # 偏航角
quality_metrics['roll'] = float(roll) # 翻滚角
else:
quality_metrics['pitch'] = 0.0
quality_metrics['yaw'] = 0.0
quality_metrics['roll'] = 0.0
# 3. 人脸边界框信息
bbox = face.bbox
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
quality_metrics['bbox_area'] = width * height
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
# 4. 关键点质量评估 (基于关键点分布)
if hasattr(face, 'kps') and face.kps is not None:
kps = face.kps
# 计算关键点分布的均匀性
if len(kps) >= 5:
# 计算关键点之间的平均距离
distances = []
for i in range(len(kps)):
for j in range(i + 1, len(kps)):
dist = np.linalg.norm(kps[i] - kps[j])
distances.append(dist)
if distances:
quality_metrics['kps_variance'] = float(np.var(distances))
else:
quality_metrics['kps_variance'] = 0.0
else:
quality_metrics['kps_variance'] = 0.0
else:
quality_metrics['kps_variance'] = 0.0
# 5. 综合质量评分
# 基于检测得分、姿态角度、边界框大小等因素
base_score = quality_metrics['det_score']
# 姿态惩罚 - 角度越大质量分越低
pose_penalty = 0.0
if abs(quality_metrics['yaw']) > 30: # 偏航角大于30度惩罚
pose_penalty += 0.2
if abs(quality_metrics['pitch']) > 20: # 俯仰角大于20度惩罚
pose_penalty += 0.2
quality_metrics['quality_score'] = max(0.1, base_score - pose_penalty)
return quality_metrics
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
similarity = 0.0
if self.target_embedding is not None:
# 计算相似度
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
# 计算人脸质量指标
quality_metrics = self.calculate_face_quality(face)
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score),
'quality_metrics': quality_metrics # 添加质量指标
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果和质量信息"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
quality_metrics = result['quality_metrics']
# 选择颜色
if is_match:
color = (0, 255, 0) # 绿色 - 匹配
else:
color = (0, 0, 255) # 红色 - 不匹配
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 准备显示文本
text_lines = []
# 第一行:匹配状态和相似度
if self.target_id:
status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
text_lines.append(status)
else:
text_lines.append(f"Similarity: {similarity:.3f}")
# 第二行:基础信息
text_lines.append(f"{result['gender']}/{result['age']}")
# 第三行:质量得分和检测得分
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
# 第四行:姿态角度
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
# 第五行:其他质量指标
text_lines.append(f"Area: {quality_metrics['bbox_area']:.0f}")
text_lines.append(f"Aspect: {quality_metrics['aspect_ratio']:.2f}")
# 计算文本区域大小
max_text_width = 0
total_text_height = 0
line_heights = []
for line in text_lines:
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
max_text_width = max(max_text_width, text_width)
line_heights.append(text_height + baseline)
total_text_height += text_height + baseline + 2 # 2像素行间距
# 绘制文本背景
bg_x1 = x1
bg_y1 = y1 - total_text_height - 10
bg_x2 = x1 + max_text_width + 10
bg_y2 = y1
# 如果背景超出图像顶部,调整到框下方
if bg_y1 < 0:
bg_y1 = y2
bg_y2 = y2 + total_text_height + 10
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
alpha = 0.6 # 透明度
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
# 绘制文本
current_y = bg_y1 + 15
for i, line in enumerate(text_lines):
# 根据内容选择颜色
if i == 0: # 匹配状态行
text_color = (0, 255, 0) if is_match else (0, 0, 255)
elif i in [2, 3]: # 质量得分行
# 根据质量得分调整颜色
quality = quality_metrics['quality_score']
if quality > 0.7:
text_color = (0, 255, 0) # 绿色 - 高质量
elif quality > 0.4:
text_color = (0, 255, 255) # 黄色 - 中等质量
else:
text_color = (0, 0, 255) # 红色 - 低质量
elif i in [4, 5, 6]: # 姿态角度行
# 根据角度大小调整颜色
if abs(quality_metrics['yaw']) > 45 or abs(quality_metrics['pitch']) > 30:
text_color = (0, 0, 255) # 红色 - 角度过大
elif abs(quality_metrics['yaw']) > 30 or abs(quality_metrics['pitch']) > 20:
text_color = (0, 255, 255) # 黄色 - 角度偏大
else:
text_color = (0, 255, 0) # 绿色 - 角度良好
else:
text_color = (255, 255, 255) # 白色 - 普通信息
cv2.putText(frame, line, (x1 + 5, current_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
current_y += line_heights[i]
return frame
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
# 设置输出视频
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print("🎥 开始摄像头实时识别 (按 'q' 退出)...")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置目标人脸(可选)
target_image = "test_data/register/sy.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
# 选择处理模式
print("请选择处理模式:")
print("1. 处理视频文件")
print("2. 实时摄像头")
choice = input("请输入选择 (1 或 2): ").strip()
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_1_quality.mp4"
# 性能优化:跳帧处理
skip_frames = 1 # 每2帧处理1帧提高速度
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4" # 可选:保存录制
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,541 @@
# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple
import os
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=0.2,
det_size=(640, 640)
)
self.target_embedding = None
self.target_id = None
self.similarity_threshold = 0.3
# 质量阈值设置
self.clarity_threshold = 50.0 # 清晰度阈值,低于此值认为人脸模糊
self.min_face_size = 40 # 最小人脸像素尺寸
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_target_face(self, image_path: str, person_id: str = "target") -> bool:
"""设置目标人脸"""
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取目标图像: {image_path}")
return False
faces = self.app.get(img)
if not faces:
print(f"❌ 目标图像中未检测到人脸: {image_path}")
return False
self.target_embedding = faces[0].embedding
self.target_id = person_id
print(f"✅ 目标人脸设置: {person_id}")
return True
def calculate_clarity(self, face_region: np.ndarray) -> float:
"""
计算人脸区域的清晰度/模糊度
使用拉普拉斯方差方法:值越高表示图像越清晰
"""
if len(face_region.shape) == 3:
gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY)
else:
gray = face_region
# 计算拉普拉斯算子的方差
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]:
"""
综合判断人脸质量是否可接受
返回: (是否可接受, 质量指标字典)
"""
quality_metrics = {}
# 1. 检测置信度
quality_metrics['det_score'] = float(face.det_score)
# 2. 人脸姿态角度
if hasattr(face, 'pose') and face.pose is not None:
pitch, yaw, roll = face.pose
quality_metrics['pitch'] = float(pitch)
quality_metrics['yaw'] = float(yaw)
quality_metrics['roll'] = float(roll)
else:
quality_metrics['pitch'] = 0.0
quality_metrics['yaw'] = 0.0
quality_metrics['roll'] = 0.0
# 3. 人脸边界框信息
bbox = face.bbox
x1, y1, x2, y2 = bbox.astype(int)
width = x2 - x1
height = y2 - y1
quality_metrics['bbox_area'] = width * height
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
# 4. 图像清晰度检测
# 提取人脸区域
h, w = frame.shape[:2]
x1_clip = max(0, x1)
y1_clip = max(0, y1)
x2_clip = min(w, x2)
y2_clip = min(h, y2)
if x2_clip > x1_clip and y2_clip > y1_clip:
face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip]
clarity_score = self.calculate_clarity(face_region)
quality_metrics['clarity_score'] = clarity_score
else:
quality_metrics['clarity_score'] = 0.0
# 5. 关键点质量评估
if hasattr(face, 'kps') and face.kps is not None:
kps = face.kps
if len(kps) >= 5:
distances = []
for i in range(len(kps)):
for j in range(i + 1, len(kps)):
dist = np.linalg.norm(kps[i] - kps[j])
distances.append(dist)
if distances:
quality_metrics['kps_variance'] = float(np.var(distances))
else:
quality_metrics['kps_variance'] = 0.0
else:
quality_metrics['kps_variance'] = 0.0
else:
quality_metrics['kps_variance'] = 0.0
# 6. 综合质量评分
base_score = quality_metrics['det_score']
# 清晰度惩罚
clarity_penalty = 0.0
if quality_metrics['clarity_score'] < self.clarity_threshold:
clarity_penalty = 0.3 # 清晰度不足严重惩罚
# 姿态惩罚
pose_penalty = 0.0
if abs(quality_metrics['yaw']) > 30:
pose_penalty += 0.2
if abs(quality_metrics['pitch']) > 20:
pose_penalty += 0.2
# 尺寸惩罚
size_penalty = 0.0
if quality_metrics['bbox_area'] < (self.min_face_size ** 2):
size_penalty = 0.2
quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty)
# 判断是否可接受
is_acceptable = (
quality_metrics['det_score'] > 0.5 and # 基础检测置信度
quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求
quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求
abs(quality_metrics['yaw']) < 60 and # 偏航角限制
abs(quality_metrics['pitch']) < 45 # 俯仰角限制
)
return is_acceptable, quality_metrics
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量是否可接受
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
similarity = 0.0
# 只有在人脸质量可接受时才计算相似度
if is_acceptable and self.target_embedding is not None:
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable # 新增:是否可接受标志
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果和质量信息"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
is_acceptable = result['is_acceptable']
quality_metrics = result['quality_metrics']
# 选择颜色
if not is_acceptable:
color = (128, 128, 128) # 灰色 - 质量不可接受
elif is_match:
color = (0, 255, 0) # 绿色 - 匹配
else:
color = (0, 0, 255) # 红色 - 不匹配
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 准备显示文本
text_lines = []
# 第一行:质量状态和匹配状态
if not is_acceptable:
text_lines.append("LOW QUALITY")
elif self.target_id:
status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
text_lines.append(status)
else:
text_lines.append(f"Similarity: {similarity:.3f}")
# 第二行:基础信息
text_lines.append(f"{result['gender']}/{result['age']}")
# 第三行:质量得分
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
# 第四行:清晰度(关键指标)
clarity_status = "CLEAR" if quality_metrics['clarity_score'] >= self.clarity_threshold else "BLUR"
text_lines.append(f"Clarity: {clarity_status} ({quality_metrics['clarity_score']:.1f})")
# 第五行:姿态角度
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
# 第六行:其他质量指标
text_lines.append(f"Area: {quality_metrics['bbox_area']:.0f}")
# 计算文本区域大小
max_text_width = 0
total_text_height = 0
line_heights = []
for line in text_lines:
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
max_text_width = max(max_text_width, text_width)
line_heights.append(text_height + baseline)
total_text_height += text_height + baseline + 2
# 绘制文本背景
bg_x1 = x1
bg_y1 = y1 - total_text_height - 10
bg_x2 = x1 + max_text_width + 10
bg_y2 = y1
# 如果背景超出图像顶部,调整到框下方
if bg_y1 < 0:
bg_y1 = y2
bg_y2 = y2 + total_text_height + 10
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
alpha = 0.6
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
# 绘制文本
current_y = bg_y1 + 15
for i, line in enumerate(text_lines):
# 根据内容选择颜色
if i == 0: # 状态行
if not is_acceptable:
text_color = (128, 128, 128) # 灰色 - 质量差
elif is_match:
text_color = (0, 255, 0) # 绿色 - 匹配
else:
text_color = (0, 0, 255) # 红色 - 不匹配
elif i == 3: # 清晰度行
if quality_metrics['clarity_score'] >= self.clarity_threshold:
text_color = (0, 255, 0) # 绿色 - 清晰
else:
text_color = (0, 0, 255) # 红色 - 模糊
elif i in [4, 5, 6]: # 姿态角度行
if abs(quality_metrics['yaw']) > 45 or abs(quality_metrics['pitch']) > 30:
text_color = (0, 0, 255) # 红色 - 角度过大
elif abs(quality_metrics['yaw']) > 30 or abs(quality_metrics['pitch']) > 20:
text_color = (0, 255, 255) # 黄色 - 角度偏大
else:
text_color = (0, 255, 0) # 绿色 - 角度良好
else:
text_color = (255, 255, 255) # 白色
cv2.putText(frame, line, (x1 + 5, current_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
current_y += line_heights[i]
return frame
def set_quality_thresholds(self, clarity_threshold: float = None, min_face_size: int = None):
"""设置质量阈值"""
if clarity_threshold is not None:
self.clarity_threshold = clarity_threshold
if min_face_size is not None:
self.min_face_size = min_face_size
print(f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 最小尺寸: {self.min_face_size}")
# 原有的 process_video_file 和 process_webcam 方法保持不变
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
# 设置输出视频
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print("🎥 开始摄像头实时识别 (按 'q' 退出)...")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置质量阈值(可根据实际情况调整)
video_system.set_quality_thresholds(
clarity_threshold=50.0, # 清晰度阈值,可能需要根据你的视频调整
min_face_size=40
)
# 设置目标人脸(可选)
target_image = "test_data/register/sy.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
# 选择处理模式
print("请选择处理模式:")
print("1. 处理视频文件")
print("2. 实时摄像头")
choice = input("请输入选择 (1 或 2): ").strip()
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_5_quality.mp4"
# 性能优化:跳帧处理
skip_frames = 1
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4"
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,439 @@
# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple, Optional
import os
import glob
#黑白名单
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
支持黑名单和白名单模式
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=0.39,
det_size=(640, 640)
)
# 名单相关变量
self.list_mode = "blacklist" # "blacklist" 或 "whitelist"
self.registered_faces = {} # {name: embedding}
self.similarity_threshold = 0.3
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_list_mode(self, mode: str):
"""设置名单模式"""
if mode.lower() in ["blacklist", "whitelist"]:
self.list_mode = mode.lower()
print(f"✅ 名单模式设置为: {self.list_mode}")
else:
print("❌ 无效的名单模式,请使用 'blacklist''whitelist'")
def load_registered_faces(self, register_dir: str):
"""
从目录加载注册的人脸图片
文件名(去掉后缀)即为人的名字
"""
if not os.path.exists(register_dir):
print(f"❌ 注册目录不存在: {register_dir}")
return False
# 支持的图片格式
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp']
image_files = []
for ext in image_extensions:
image_files.extend(glob.glob(os.path.join(register_dir, ext)))
image_files.extend(glob.glob(os.path.join(register_dir, ext.upper())))
if not image_files:
print(f"❌ 在目录 {register_dir} 中未找到图片文件")
return False
loaded_count = 0
for image_path in image_files:
# 获取文件名(不含扩展名)作为人名
person_name = os.path.splitext(os.path.basename(image_path))[0]
# 读取图片并提取人脸特征
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取图片: {image_path}")
continue
faces = self.app.get(img)
if not faces:
print(f"❌ 图片中未检测到人脸: {image_path}")
continue
# 使用第一张检测到的人脸
self.registered_faces[person_name] = faces[0].embedding
loaded_count += 1
print(f"✅ 加载注册人脸: {person_name}")
print(f"🎉 成功加载 {loaded_count} 张注册人脸")
return loaded_count > 0
def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]:
"""
在注册人脸中查找最佳匹配
返回: (匹配的人名, 相似度)
"""
if not self.registered_faces:
return None, 0.0
best_similarity = 0.0
best_name = None
# 归一化查询嵌入
query_emb = embedding / np.linalg.norm(embedding)
for name, registered_embedding in self.registered_faces.items():
# 归一化注册嵌入
reg_emb = registered_embedding / np.linalg.norm(registered_embedding)
# 计算余弦相似度
similarity = float(np.dot(query_emb, reg_emb))
if similarity > best_similarity:
best_similarity = similarity
best_name = name
return best_name, best_similarity
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 根据名单模式判断是否匹配
if self.list_mode == "blacklist":
# 黑名单模式:在黑名单中即为匹配(需要关注)
is_match = best_name is not None and similarity >= self.similarity_threshold
else: # whitelist
# 白名单模式:在白名单中即为匹配(允许通过)
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score)
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
best_match = result['best_match']
# 选择颜色 - 根据名单模式
if self.list_mode == "blacklist":
# 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色
color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常
else: # whitelist
# 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色
color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 构建显示文本
if best_match and similarity >= self.similarity_threshold:
name_text = f"{best_match}: {similarity:.3f}"
else:
name_text = f"Unknown: {similarity:.3f}"
# 添加名单状态
status = "MATCH" if is_match else "NO MATCH"
text = f"{status} | {name_text}"
# 文本背景
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
cv2.rectangle(frame, (x1, y1 - text_size[1] - 10),
(x1 + text_size[0], y1), color, -1)
# 文本
cv2.putText(frame, text, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
# 详细信息
info_text = f"{result['gender']}/{result['age']}"
cv2.putText(frame, info_text, (x1, y2 + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return frame
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
print(f"🎯 当前模式: {self.list_mode}, 注册人脸数: {len(self.registered_faces)}")
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)} | Mode: {self.list_mode}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print(f"🎥 开始摄像头实时识别 - 模式: {self.list_mode} (按 'q' 退出)...")
print(f"📋 注册人脸数: {len(self.registered_faces)}")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {self.list_mode}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置名单模式
# video_system.set_list_mode("blacklist") # 黑名单模式
video_system.set_list_mode("whitelist") # 白名单模式
# 加载注册人脸
register_dir = "test_data/register" # 注册图片目录
if os.path.exists(register_dir):
video_system.load_registered_faces(register_dir)
else:
print(f"⚠️ 注册目录不存在: {register_dir}")
# # 选择处理模式
# print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
#
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1"
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_1_white.mp4"
# 性能优化:跳帧处理
skip_frames = 1 # 每2帧处理1帧提高速度
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4" # 可选:保存录制
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,571 @@
# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple
import os
from sympy import false
#改进后的人脸质量显示
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
"""
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 质量阈值设置
self.det_size = 640 # 320快速 640中等 1280慢
self.det_threshold = 0.7 # 人脸置信度
self.clarity_threshold = 1000.0 # 清晰度阈值,低于此值认为人脸模糊
self.min_face_size = 30 # 最小人脸像素尺寸
self.pitch_threshold = 30 # 人脸置信度
self.yaw_threshold = 20 # 人脸置信度
self.quality_threshold = 0.6 # 质量得分阈值
self.similarity_threshold = 0.1
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=self.det_threshold,
det_size=(640, 640)
)
self.target_embedding = None
self.target_id = None
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_target_face(self, image_path: str, person_id: str = "target") -> bool:
"""设置目标人脸"""
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取目标图像: {image_path}")
return False
faces = self.app.get(img)
if not faces:
print(f"❌ 目标图像中未检测到人脸: {image_path}")
return False
self.target_embedding = faces[0].embedding
self.target_id = person_id
print(f"✅ 目标人脸设置: {person_id}")
return True
def calculate_clarity(self, face_region: np.ndarray) -> float:
"""
计算人脸区域的清晰度/模糊度
使用拉普拉斯方差方法:值越高表示图像越清晰
"""
if len(face_region.shape) == 3:
gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY)
else:
gray = face_region
# 计算拉普拉斯算子的方差
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]:
"""
综合判断人脸质量是否可接受
返回: (是否可接受, 质量指标字典)
"""
quality_metrics = {}
is_acceptable = True
# 1. 检测置信度
quality_metrics['det_score'] = float(face.det_score)
# 2. 人脸姿态角度
if hasattr(face, 'pose') and face.pose is not None:
pitch, yaw, roll = face.pose
quality_metrics['pitch'] = float(pitch)
quality_metrics['yaw'] = float(yaw)
quality_metrics['roll'] = float(roll)
else:
quality_metrics['pitch'] = 100.0
quality_metrics['yaw'] = 100.0
quality_metrics['roll'] = 100.0
# 3. 人脸边界框信息
bbox = face.bbox
x1, y1, x2, y2 = bbox.astype(int)
width = x2 - x1
height = y2 - y1
quality_metrics['bbox_width'] = width
quality_metrics['bbox_height'] = height
quality_metrics['bbox_area'] = width * height
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
# 4. 图像清晰度检测
# 提取人脸区域
h, w = frame.shape[:2]
x1_clip = max(0, x1)
y1_clip = max(0, y1)
x2_clip = min(w, x2)
y2_clip = min(h, y2)
if x2_clip > x1_clip and y2_clip > y1_clip:
face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip]
clarity_score = self.calculate_clarity(face_region)
quality_metrics['clarity_score'] = clarity_score
else:
quality_metrics['clarity_score'] = 0.0
# 5. 综合质量评分
base_score = quality_metrics['det_score']
# 清晰度惩罚
clarity_penalty = 0.0
if quality_metrics['clarity_score'] < self.clarity_threshold:
clarity_penalty = 0.3 # 清晰度不足严重惩罚
is_acceptable = False
# 姿态惩罚
pose_penalty = 0.0
if abs(quality_metrics['yaw']) > self.yaw_threshold:
pose_penalty += 0.2
is_acceptable = False
if abs(quality_metrics['pitch']) > self.pitch_threshold:
pose_penalty += 0.2
is_acceptable = False
# 尺寸惩罚
size_penalty = 0.0
# if quality_metrics['bbox_area'] < (self.min_face_size ** 2):
# size_penalty = 0.2
if min(width, height) < self.min_face_size:
is_acceptable = False
size_penalty = 0.2
quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty)
# # 判断是否可接受
# is_acceptable = (
# quality_metrics['det_score'] > 0.5 and # 基础检测置信度
# quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求
# quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求
# abs(quality_metrics['yaw']) < 60 and # 偏航角限制
# abs(quality_metrics['pitch']) < 45 # 俯仰角限制
# )
return is_acceptable, quality_metrics
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量是否可接受
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
similarity = 0.0
# 只有在人脸质量可接受时才计算相似度
if is_acceptable and self.target_embedding is not None:
emb1 = face.embedding / np.linalg.norm(face.embedding)
emb2 = self.target_embedding / np.linalg.norm(self.target_embedding)
similarity = float(np.dot(emb1, emb2))
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'is_match': similarity >= self.similarity_threshold,
'gender': 'Male' if face.gender == 1 else 'Female',
'age': int(face.age),
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable # 新增:是否可接受标志
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果和质量信息"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
is_acceptable = result['is_acceptable']
quality_metrics = result['quality_metrics']
# 选择颜色
if not is_acceptable:
color = (128, 128, 128) # 灰色 - 质量不可接受
elif is_match:
color = (0, 255, 0) # 绿色 - 匹配
else:
color = (0, 0, 255) # 红色 - 不匹配
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 准备显示文本 - 只保留关键信息
text_lines = []
# 第一行:匹配状态
if not is_acceptable:
text_lines.append("LOW QUALITY")
elif self.target_id:
status = f"MATCH: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
text_lines.append(status)
else:
text_lines.append(f"Similarity: {similarity:.3f}")
# 第二行:质量得分(根据阈值显示颜色)
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
# 第三行:检测得分
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
# 第四行:清晰度
text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}")
# 第五行:姿态角度
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
# text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}")
text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}")
# 计算文本区域大小
max_text_width = 0
total_text_height = 0
line_heights = []
for line in text_lines:
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
max_text_width = max(max_text_width, text_width)
line_heights.append(text_height + baseline)
total_text_height += text_height + baseline + 2
# 绘制文本背景
bg_x1 = x1
bg_y1 = y1 - total_text_height - 10
bg_x2 = x1 + max_text_width + 10
bg_y2 = y1
# 如果背景超出图像顶部,调整到框下方
if bg_y1 < 0:
bg_y1 = y2
bg_y2 = y2 + total_text_height + 10
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
alpha = 0.6
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
# 绘制文本
current_y = bg_y1 + 15
for i, line in enumerate(text_lines):
# 根据内容选择颜色 - 按照你的要求简化颜色规则
if i == 0: # 状态行
if not is_acceptable:
text_color = (128, 128, 128) # 灰色 - 质量差
elif is_match:
text_color = (0, 255, 0) # 绿色 - 匹配
else:
text_color = (0, 0, 255) # 红色 - 不匹配
# elif i == 1: # 质量得分行
# # 质量得分:低于阈值红色,大于等于阈值绿色
# if quality_metrics['quality_score'] >= self.quality_threshold:
# text_color = (0, 255, 0) # 绿色 - 高质量
# else:
# text_color = (0, 0, 255) # 红色 - 低质量
elif i == 3: # 清晰度行
# 清晰度:低于阈值红色,大于等于阈值绿色
if quality_metrics['clarity_score'] >= self.clarity_threshold:
text_color = (255, 255, 255)
else:
text_color = (0, 0, 255)
elif i == 4: # pitch
if abs(quality_metrics['pitch']) > self.pitch_threshold:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 5: # yaw
if abs(quality_metrics['yaw']) > self.yaw_threshold:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 6: #
if quality_metrics['bbox_width'] < self.min_face_size:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 7: #
if quality_metrics['bbox_height'] < self.min_face_size:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
else:
text_color = (255, 255, 255) # 白色 - 其他信息
cv2.putText(frame, line, (x1 + 5, current_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
current_y += line_heights[i]
return frame
def set_quality_thresholds(self, clarity_threshold: float = None,
quality_threshold: float = None,
min_face_size: int = None):
"""设置质量阈值"""
if clarity_threshold is not None:
self.clarity_threshold = clarity_threshold
if quality_threshold is not None:
self.quality_threshold = quality_threshold
if min_face_size is not None:
self.min_face_size = min_face_size
print(
f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}")
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
# 设置输出视频
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames + 1), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print("🎥 开始摄像头实时识别 (按 'q' 退出)...")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置质量阈值(可根据实际情况调整)
video_system.set_quality_thresholds(
clarity_threshold=1000.0, # 清晰度阈值
quality_threshold=0.6, # 质量得分阈值
min_face_size=30
)
# 设置目标人脸(可选)
target_image = "test_data/register/sy.jpg"
if os.path.exists(target_image):
video_system.set_target_face(target_image, "目标人物")
# # 选择处理模式
# print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
#
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1"
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_1.mp4"
output_path = "test_data/output_video/video_6_quality.mp4"
# 性能优化:跳帧处理
skip_frames = 1
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=True
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4"
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,684 @@
# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple, Optional
import os
import glob
#改进后的人脸质量显示
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
支持黑名单和白名单模式
"""
# buffalo_l buffalo_sc
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 质量阈值设置
self.det_size = 640 # 320快速 640中等 1280慢
#white
self.list_mode = "whitelist" # "blacklist" 或 "whitelist"
self.det_threshold = 0.7 # 人脸置信度
self.clarity_threshold = 1000.0 # 清晰度阈值,低于此值认为人脸模糊
self.min_face_size = 30 # 最小人脸像素尺寸
self.pitch_threshold = 40 #
self.yaw_threshold = 40 #
self.quality_threshold = 0.6 # 质量得分阈值
self.similarity_threshold = 0.13
# #black
# self.list_mode = "blacklist" # "blacklist" 或 "whitelist"
# self.det_threshold = 0.5 # 人脸置信度
# self.clarity_threshold = 100.0 # 清晰度阈值,低于此值认为人脸模糊
# self.min_face_size = 20 # 最小人脸像素尺寸
# self.pitch_threshold = 90 #
# self.yaw_threshold = 90 #
# self.quality_threshold = 0.6 # 质量得分阈值
# self.similarity_threshold = 0.3
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=self.det_threshold,
det_size=(self.det_size,self. det_size)
)
# 名单相关变量
self.registered_faces = {} # {name: embedding}
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_list_mode(self, mode: str):
"""设置名单模式"""
if mode.lower() in ["blacklist", "whitelist"]:
self.list_mode = mode.lower()
print(f"✅ 名单模式设置为: {self.list_mode}")
else:
print("❌ 无效的名单模式,请使用 'blacklist''whitelist'")
def load_registered_faces(self, register_dir: str):
"""
从目录加载注册的人脸图片
文件名(去掉后缀)即为人的名字
"""
if not os.path.exists(register_dir):
print(f"❌ 注册目录不存在: {register_dir}")
return False
# 支持的图片格式
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp']
image_files = []
for ext in image_extensions:
image_files.extend(glob.glob(os.path.join(register_dir, ext)))
image_files.extend(glob.glob(os.path.join(register_dir, ext.upper())))
if not image_files:
print(f"❌ 在目录 {register_dir} 中未找到图片文件")
return False
loaded_count = 0
for image_path in image_files:
# 获取文件名(不含扩展名)作为人名
person_name = os.path.splitext(os.path.basename(image_path))[0]
# 读取图片并提取人脸特征
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取图片: {image_path}")
continue
faces = self.app.get(img)
if not faces:
print(f"❌ 图片中未检测到人脸: {image_path}")
continue
# 使用第一张检测到的人脸
self.registered_faces[person_name] = faces[0].embedding
loaded_count += 1
print(f"✅ 加载注册人脸: {person_name}")
print(f"🎉 成功加载 {loaded_count} 张注册人脸")
return loaded_count > 0
def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]:
"""
在注册人脸中查找最佳匹配
返回: (匹配的人名, 相似度)
"""
if not self.registered_faces:
return None, 0.0
best_similarity = 0.0
best_name = None
# 归一化查询嵌入
query_emb = embedding / np.linalg.norm(embedding)
for name, registered_embedding in self.registered_faces.items():
# 归一化注册嵌入
reg_emb = registered_embedding / np.linalg.norm(registered_embedding)
# 计算余弦相似度
similarity = float(np.dot(query_emb, reg_emb))
if similarity > best_similarity:
best_similarity = similarity
best_name = name
return best_name, best_similarity
def calculate_clarity(self, face_region: np.ndarray) -> float:
"""
计算人脸区域的清晰度/模糊度
使用拉普拉斯方差方法:值越高表示图像越清晰
"""
if len(face_region.shape) == 3:
gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY)
else:
gray = face_region
# 计算拉普拉斯算子的方差
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]:
"""
综合判断人脸质量是否可接受
返回: (是否可接受, 质量指标字典)
"""
quality_metrics = {}
is_acceptable = True
# 1. 检测置信度
quality_metrics['det_score'] = float(face.det_score)
# 2. 人脸姿态角度
if hasattr(face, 'pose') and face.pose is not None:
pitch, yaw, roll = face.pose
quality_metrics['pitch'] = float(pitch)
quality_metrics['yaw'] = float(yaw)
quality_metrics['roll'] = float(roll)
else:
quality_metrics['pitch'] = 100.0
quality_metrics['yaw'] = 100.0
quality_metrics['roll'] = 100.0
# 3. 人脸边界框信息
bbox = face.bbox
x1, y1, x2, y2 = bbox.astype(int)
width = x2 - x1
height = y2 - y1
quality_metrics['bbox_width'] = width
quality_metrics['bbox_height'] = height
quality_metrics['bbox_area'] = width * height
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
# 4. 图像清晰度检测
# 提取人脸区域
h, w = frame.shape[:2]
x1_clip = max(0, x1)
y1_clip = max(0, y1)
x2_clip = min(w, x2)
y2_clip = min(h, y2)
if x2_clip > x1_clip and y2_clip > y1_clip:
face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip]
clarity_score = self.calculate_clarity(face_region)
quality_metrics['clarity_score'] = clarity_score
else:
quality_metrics['clarity_score'] = 0.0
# 5. 综合质量评分
base_score = quality_metrics['det_score']
# 清晰度惩罚
clarity_penalty = 0.0
if quality_metrics['clarity_score'] < self.clarity_threshold:
clarity_penalty = 0.3 # 清晰度不足严重惩罚
is_acceptable = False
# 姿态惩罚
pose_penalty = 0.0
if abs(quality_metrics['yaw']) > self.yaw_threshold:
pose_penalty += 0.2
is_acceptable = False
if abs(quality_metrics['pitch']) > self.pitch_threshold:
pose_penalty += 0.2
is_acceptable = False
# 尺寸惩罚
size_penalty = 0.0
# if quality_metrics['bbox_area'] < (self.min_face_size ** 2):
# size_penalty = 0.2
if min(width, height) < self.min_face_size:
is_acceptable = False
size_penalty = 0.2
quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty)
# # 判断是否可接受
# is_acceptable = (
# quality_metrics['det_score'] > 0.5 and # 基础检测置信度
# quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求
# quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求
# abs(quality_metrics['yaw']) < 60 and # 偏航角限制
# abs(quality_metrics['pitch']) < 45 # 俯仰角限制
# )
return is_acceptable, quality_metrics
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量是否可接受
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 根据名单模式判断是否匹配
if self.list_mode == "blacklist":
# 黑名单模式:在黑名单中即为匹配(需要关注)
is_match = best_name is not None and similarity >= self.similarity_threshold
else: # whitelist
# 白名单模式:在白名单中即为匹配(允许通过)
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
# 'gender': 'Male' if face.gender == 1 else 'Female',
# 'age': int(face.age),
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable # 新增:是否可接受标志
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果和质量信息"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
is_acceptable = result['is_acceptable']
quality_metrics = result['quality_metrics']
best_match = result['best_match']
# 选择颜色
if not is_acceptable:
color = (128, 128, 128) # 灰色 - 质量不可接受
else:
# 选择颜色 - 根据名单模式
if self.list_mode == "blacklist":
# 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色
color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常
else: # whitelist
# 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色
color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 准备显示文本 - 只保留关键信息
text_lines = []
# 第一行:匹配状态
if not is_acceptable:
text_lines.append("LOW QUALITY")
else:
status = f"MATCH: {best_match}: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
text_lines.append(status)
# 第二行:质量得分(根据阈值显示颜色)
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
# 第三行:检测得分
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
# 第四行:清晰度
text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}")
# 第五行:姿态角度
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
# text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}")
text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}")
# 计算文本区域大小
max_text_width = 0
total_text_height = 0
line_heights = []
for line in text_lines:
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
max_text_width = max(max_text_width, text_width)
line_heights.append(text_height + baseline)
total_text_height += text_height + baseline + 2
# 绘制文本背景
bg_x1 = x1
bg_y1 = y1 - total_text_height - 10
bg_x2 = x1 + max_text_width + 10
bg_y2 = y1
# 如果背景超出图像顶部,调整到框下方
if bg_y1 < 0:
bg_y1 = y2
bg_y2 = y2 + total_text_height + 10
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
alpha = 0.6
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
# 绘制文本
current_y = bg_y1 + 15
for i, line in enumerate(text_lines):
# 根据内容选择颜色 - 按照你的要求简化颜色规则
if i == 0: # 状态行
if not is_acceptable:
text_color = (128, 128, 128) # 灰色 - 质量差
elif is_match:
text_color = (0, 255, 0) # 绿色 - 匹配
else:
text_color = (0, 0, 255) # 红色 - 不匹配
# elif i == 1: # 质量得分行
# # 质量得分:低于阈值红色,大于等于阈值绿色
# if quality_metrics['quality_score'] >= self.quality_threshold:
# text_color = (0, 255, 0) # 绿色 - 高质量
# else:
# text_color = (0, 0, 255) # 红色 - 低质量
elif i == 3: # 清晰度行
# 清晰度:低于阈值红色,大于等于阈值绿色
if quality_metrics['clarity_score'] >= self.clarity_threshold:
text_color = (255, 255, 255)
else:
text_color = (0, 0, 255)
elif i == 4: # pitch
if abs(quality_metrics['pitch']) > self.pitch_threshold:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 5: # yaw
if abs(quality_metrics['yaw']) > self.yaw_threshold:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 6: #
if quality_metrics['bbox_width'] < self.min_face_size:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 7: #
if quality_metrics['bbox_height'] < self.min_face_size:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
else:
text_color = (255, 255, 255) # 白色 - 其他信息
cv2.putText(frame, line, (x1 + 5, current_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
current_y += line_heights[i]
return frame
def set_quality_thresholds(self, clarity_threshold: float = None,
quality_threshold: float = None,
min_face_size: int = None):
"""设置质量阈值"""
if clarity_threshold is not None:
self.clarity_threshold = clarity_threshold
if quality_threshold is not None:
self.quality_threshold = quality_threshold
if min_face_size is not None:
self.min_face_size = min_face_size
print(
f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}")
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
print(f"🎯 当前模式: {self.list_mode}, 注册人脸数: {len(self.registered_faces)}")
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)} | Mode: {self.list_mode}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print(f"🎥 开始摄像头实时识别 - 模式: {self.list_mode} (按 'q' 退出)...")
print(f"📋 注册人脸数: {len(self.registered_faces)}")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {self.list_mode}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置名单模式
# video_system.set_list_mode("blacklist") # 黑名单模式
# video_system.set_list_mode("whitelist") # 白名单模式
# 加载注册人脸
register_dir = "test_data/register" # 注册图片目录
if os.path.exists(register_dir):
video_system.load_registered_faces(register_dir)
else:
print(f"⚠️ 注册目录不存在: {register_dir}")
#
# # 设置质量阈值(可根据实际情况调整)
# video_system.set_quality_thresholds(
# clarity_threshold=1000.0, # 清晰度阈值
# quality_threshold=0.6, # 质量得分阈值
# min_face_size=30
# )
# # 选择处理模式
# print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
#
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1"
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_2.mp4"
output_path = "test_data/output_video/video_2_white_7_gpu.mp4"
# output_path = "test_data/output_video/video_2_black_2.mp4"
# 性能优化:跳帧处理
skip_frames = 2
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=False
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4"
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()