Files
SupervisorAI/backup/video_face_recognition_7.py
2025-12-20 18:07:49 +08:00

685 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# video_face_recognition.py
import cv2
import numpy as np
import time
from insightface.app import FaceAnalysis
from typing import List, Dict, Tuple, Optional
import os
import glob
#改进后的人脸质量显示
class VideoFaceRecognition:
"""
视频人脸识别系统
支持实时视频流和视频文件处理
支持黑名单和白名单模式
"""
# buffalo_l buffalo_sc
def __init__(self, model_name: str = 'buffalo_l', use_gpu: bool = True):
# 质量阈值设置
self.det_size = 640 # 320快速 640中等 1280慢
#white
self.list_mode = "whitelist" # "blacklist" 或 "whitelist"
self.det_threshold = 0.7 # 人脸置信度
self.clarity_threshold = 1000.0 # 清晰度阈值,低于此值认为人脸模糊
self.min_face_size = 30 # 最小人脸像素尺寸
self.pitch_threshold = 40 #
self.yaw_threshold = 40 #
self.quality_threshold = 0.6 # 质量得分阈值
self.similarity_threshold = 0.13
# #black
# self.list_mode = "blacklist" # "blacklist" 或 "whitelist"
# self.det_threshold = 0.5 # 人脸置信度
# self.clarity_threshold = 100.0 # 清晰度阈值,低于此值认为人脸模糊
# self.min_face_size = 20 # 最小人脸像素尺寸
# self.pitch_threshold = 90 #
# self.yaw_threshold = 90 #
# self.quality_threshold = 0.6 # 质量得分阈值
# self.similarity_threshold = 0.3
# 初始化人脸识别模型
self.app = FaceAnalysis(name=model_name)
self.app.prepare(
ctx_id=0 if use_gpu else -1,
det_thresh=self.det_threshold,
det_size=(self.det_size,self. det_size)
)
# 名单相关变量
self.registered_faces = {} # {name: embedding}
# 性能统计
self.frame_count = 0
self.processing_times = []
print(f"✅ 视频人脸识别系统初始化完成 - GPU: {use_gpu}")
def set_list_mode(self, mode: str):
"""设置名单模式"""
if mode.lower() in ["blacklist", "whitelist"]:
self.list_mode = mode.lower()
print(f"✅ 名单模式设置为: {self.list_mode}")
else:
print("❌ 无效的名单模式,请使用 'blacklist''whitelist'")
def load_registered_faces(self, register_dir: str):
"""
从目录加载注册的人脸图片
文件名(去掉后缀)即为人的名字
"""
if not os.path.exists(register_dir):
print(f"❌ 注册目录不存在: {register_dir}")
return False
# 支持的图片格式
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp']
image_files = []
for ext in image_extensions:
image_files.extend(glob.glob(os.path.join(register_dir, ext)))
image_files.extend(glob.glob(os.path.join(register_dir, ext.upper())))
if not image_files:
print(f"❌ 在目录 {register_dir} 中未找到图片文件")
return False
loaded_count = 0
for image_path in image_files:
# 获取文件名(不含扩展名)作为人名
person_name = os.path.splitext(os.path.basename(image_path))[0]
# 读取图片并提取人脸特征
img = cv2.imread(image_path)
if img is None:
print(f"❌ 无法读取图片: {image_path}")
continue
faces = self.app.get(img)
if not faces:
print(f"❌ 图片中未检测到人脸: {image_path}")
continue
# 使用第一张检测到的人脸
self.registered_faces[person_name] = faces[0].embedding
loaded_count += 1
print(f"✅ 加载注册人脸: {person_name}")
print(f"🎉 成功加载 {loaded_count} 张注册人脸")
return loaded_count > 0
def find_best_match(self, embedding: np.ndarray) -> Tuple[Optional[str], float]:
"""
在注册人脸中查找最佳匹配
返回: (匹配的人名, 相似度)
"""
if not self.registered_faces:
return None, 0.0
best_similarity = 0.0
best_name = None
# 归一化查询嵌入
query_emb = embedding / np.linalg.norm(embedding)
for name, registered_embedding in self.registered_faces.items():
# 归一化注册嵌入
reg_emb = registered_embedding / np.linalg.norm(registered_embedding)
# 计算余弦相似度
similarity = float(np.dot(query_emb, reg_emb))
if similarity > best_similarity:
best_similarity = similarity
best_name = name
return best_name, best_similarity
def calculate_clarity(self, face_region: np.ndarray) -> float:
"""
计算人脸区域的清晰度/模糊度
使用拉普拉斯方差方法:值越高表示图像越清晰
"""
if len(face_region.shape) == 3:
gray = cv2.cvtColor(face_region, cv2.COLOR_BGR2GRAY)
else:
gray = face_region
# 计算拉普拉斯算子的方差
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def is_face_quality_acceptable(self, face, frame: np.ndarray) -> Tuple[bool, Dict]:
"""
综合判断人脸质量是否可接受
返回: (是否可接受, 质量指标字典)
"""
quality_metrics = {}
is_acceptable = True
# 1. 检测置信度
quality_metrics['det_score'] = float(face.det_score)
# 2. 人脸姿态角度
if hasattr(face, 'pose') and face.pose is not None:
pitch, yaw, roll = face.pose
quality_metrics['pitch'] = float(pitch)
quality_metrics['yaw'] = float(yaw)
quality_metrics['roll'] = float(roll)
else:
quality_metrics['pitch'] = 100.0
quality_metrics['yaw'] = 100.0
quality_metrics['roll'] = 100.0
# 3. 人脸边界框信息
bbox = face.bbox
x1, y1, x2, y2 = bbox.astype(int)
width = x2 - x1
height = y2 - y1
quality_metrics['bbox_width'] = width
quality_metrics['bbox_height'] = height
quality_metrics['bbox_area'] = width * height
quality_metrics['aspect_ratio'] = width / height if height > 0 else 0
# 4. 图像清晰度检测
# 提取人脸区域
h, w = frame.shape[:2]
x1_clip = max(0, x1)
y1_clip = max(0, y1)
x2_clip = min(w, x2)
y2_clip = min(h, y2)
if x2_clip > x1_clip and y2_clip > y1_clip:
face_region = frame[y1_clip:y2_clip, x1_clip:x2_clip]
clarity_score = self.calculate_clarity(face_region)
quality_metrics['clarity_score'] = clarity_score
else:
quality_metrics['clarity_score'] = 0.0
# 5. 综合质量评分
base_score = quality_metrics['det_score']
# 清晰度惩罚
clarity_penalty = 0.0
if quality_metrics['clarity_score'] < self.clarity_threshold:
clarity_penalty = 0.3 # 清晰度不足严重惩罚
is_acceptable = False
# 姿态惩罚
pose_penalty = 0.0
if abs(quality_metrics['yaw']) > self.yaw_threshold:
pose_penalty += 0.2
is_acceptable = False
if abs(quality_metrics['pitch']) > self.pitch_threshold:
pose_penalty += 0.2
is_acceptable = False
# 尺寸惩罚
size_penalty = 0.0
# if quality_metrics['bbox_area'] < (self.min_face_size ** 2):
# size_penalty = 0.2
if min(width, height) < self.min_face_size:
is_acceptable = False
size_penalty = 0.2
quality_metrics['quality_score'] = max(0.1, base_score - clarity_penalty - pose_penalty - size_penalty)
# # 判断是否可接受
# is_acceptable = (
# quality_metrics['det_score'] > 0.5 and # 基础检测置信度
# quality_metrics['clarity_score'] >= self.clarity_threshold and # 清晰度要求
# quality_metrics['bbox_area'] >= (self.min_face_size ** 2) and # 最小尺寸要求
# abs(quality_metrics['yaw']) < 60 and # 偏航角限制
# abs(quality_metrics['pitch']) < 45 # 俯仰角限制
# )
return is_acceptable, quality_metrics
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
处理单帧图像
返回: (处理后的帧, 识别结果列表)
"""
start_time = time.time()
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量是否可接受
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
# 根据名单模式判断是否匹配
if self.list_mode == "blacklist":
# 黑名单模式:在黑名单中即为匹配(需要关注)
is_match = best_name is not None and similarity >= self.similarity_threshold
else: # whitelist
# 白名单模式:在白名单中即为匹配(允许通过)
is_match = best_name is not None and similarity >= self.similarity_threshold
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
# 'gender': 'Male' if face.gender == 1 else 'Female',
# 'age': int(face.age),
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable # 新增:是否可接受标志
}
results.append(result)
# 在帧上绘制结果
frame = self._draw_detection(frame, result)
# 性能统计
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
self.frame_count += 1
return frame, results
def _draw_detection(self, frame: np.ndarray, result: Dict) -> np.ndarray:
"""在帧上绘制检测结果和质量信息"""
bbox = result['bbox']
similarity = result['similarity']
is_match = result['is_match']
is_acceptable = result['is_acceptable']
quality_metrics = result['quality_metrics']
best_match = result['best_match']
# 选择颜色
if not is_acceptable:
color = (128, 128, 128) # 灰色 - 质量不可接受
else:
# 选择颜色 - 根据名单模式
if self.list_mode == "blacklist":
# 黑名单模式:匹配(在黑名单中)显示红色,不匹配显示绿色
color = (0, 0, 255) if is_match else (0, 255, 0) # 红色-黑名单, 绿色-正常
else: # whitelist
# 白名单模式:匹配(在白名单中)显示绿色,不匹配显示红色
color = (0, 255, 0) if is_match else (0, 0, 255) # 绿色-白名单, 红色-陌生人
# 绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 准备显示文本 - 只保留关键信息
text_lines = []
# 第一行:匹配状态
if not is_acceptable:
text_lines.append("LOW QUALITY")
else:
status = f"MATCH: {best_match}: {similarity:.3f}" if is_match else f"NO MATCH: {similarity:.3f}"
text_lines.append(status)
# 第二行:质量得分(根据阈值显示颜色)
text_lines.append(f"Quality: {quality_metrics['quality_score']:.3f}")
# 第三行:检测得分
text_lines.append(f"DetScore: {quality_metrics['det_score']:.3f}")
# 第四行:清晰度
text_lines.append(f"Clarity: {quality_metrics['clarity_score']:.1f}")
# 第五行:姿态角度
text_lines.append(f"Pitch: {quality_metrics['pitch']:.1f}°")
text_lines.append(f"Yaw: {quality_metrics['yaw']:.1f}°")
# text_lines.append(f"Roll: {quality_metrics['roll']:.1f}°")
text_lines.append(f"Width: {quality_metrics['bbox_width']:.1f}")
text_lines.append(f"Height: {quality_metrics['bbox_height']:.1f}")
# 计算文本区域大小
max_text_width = 0
total_text_height = 0
line_heights = []
for line in text_lines:
(text_width, text_height), baseline = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
max_text_width = max(max_text_width, text_width)
line_heights.append(text_height + baseline)
total_text_height += text_height + baseline + 2
# 绘制文本背景
bg_x1 = x1
bg_y1 = y1 - total_text_height - 10
bg_x2 = x1 + max_text_width + 10
bg_y2 = y1
# 如果背景超出图像顶部,调整到框下方
if bg_y1 < 0:
bg_y1 = y2
bg_y2 = y2 + total_text_height + 10
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), (0, 0, 0), -1)
alpha = 0.6
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
# 绘制文本
current_y = bg_y1 + 15
for i, line in enumerate(text_lines):
# 根据内容选择颜色 - 按照你的要求简化颜色规则
if i == 0: # 状态行
if not is_acceptable:
text_color = (128, 128, 128) # 灰色 - 质量差
elif is_match:
text_color = (0, 255, 0) # 绿色 - 匹配
else:
text_color = (0, 0, 255) # 红色 - 不匹配
# elif i == 1: # 质量得分行
# # 质量得分:低于阈值红色,大于等于阈值绿色
# if quality_metrics['quality_score'] >= self.quality_threshold:
# text_color = (0, 255, 0) # 绿色 - 高质量
# else:
# text_color = (0, 0, 255) # 红色 - 低质量
elif i == 3: # 清晰度行
# 清晰度:低于阈值红色,大于等于阈值绿色
if quality_metrics['clarity_score'] >= self.clarity_threshold:
text_color = (255, 255, 255)
else:
text_color = (0, 0, 255)
elif i == 4: # pitch
if abs(quality_metrics['pitch']) > self.pitch_threshold:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 5: # yaw
if abs(quality_metrics['yaw']) > self.yaw_threshold:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 6: #
if quality_metrics['bbox_width'] < self.min_face_size:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
elif i == 7: #
if quality_metrics['bbox_height'] < self.min_face_size:
text_color = (0, 0, 255) # 红色
else:
text_color = (255, 255, 255)
else:
text_color = (255, 255, 255) # 白色 - 其他信息
cv2.putText(frame, line, (x1 + 5, current_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
current_y += line_heights[i]
return frame
def set_quality_thresholds(self, clarity_threshold: float = None,
quality_threshold: float = None,
min_face_size: int = None):
"""设置质量阈值"""
if clarity_threshold is not None:
self.clarity_threshold = clarity_threshold
if quality_threshold is not None:
self.quality_threshold = quality_threshold
if min_face_size is not None:
self.min_face_size = min_face_size
print(
f"✅ 质量阈值更新 - 清晰度: {self.clarity_threshold}, 质量得分: {self.quality_threshold}, 最小尺寸: {self.min_face_size}")
def process_video_file(self, video_path: str, output_path: str = None,
skip_frames: int = 0, show_preview: bool = True):
"""
处理视频文件
Args:
video_path: 输入视频路径
output_path: 输出视频路径
skip_frames: 跳帧数,用于提高处理速度
show_preview: 是否显示实时预览
"""
if not os.path.exists(video_path):
print(f"❌ 视频文件不存在: {video_path}")
return
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 无法打开视频文件: {video_path}")
return
# 获取视频信息
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 视频信息: {width}x{height}, {fps:.1f}FPS, 总帧数: {total_frames}")
print(f"🎯 当前模式: {self.list_mode}, 注册人脸数: {len(self.registered_faces)}")
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps / (skip_frames), (width, height))
else:
out = None
# 处理视频帧
frame_index = 0
processed_frames = 0
start_time = time.time()
print("🚀 开始处理视频...")
while True:
ret, frame = cap.read()
if not ret:
break
# 跳帧处理
if skip_frames > 0 and frame_index % (skip_frames + 1) != 0:
frame_index += 1
continue
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 写入输出视频
if out:
out.write(processed_frame)
# 显示预览
if show_preview:
# 添加性能信息
fps_text = f"Frame: {frame_index}/{total_frames} | Faces: {len(results)} | Mode: {self.list_mode}"
cv2.putText(processed_frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Video Face Recognition', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_index += 1
processed_frames += 1
# 进度显示
if frame_index % 30 == 0:
progress = (frame_index / total_frames) * 100
print(f"📊 处理进度: {progress:.1f}% ({frame_index}/{total_frames})")
# 清理资源
cap.release()
if out:
out.release()
if show_preview:
cv2.destroyAllWindows()
# 性能统计
total_time = time.time() - start_time
avg_processing_time = np.mean(self.processing_times) if self.processing_times else 0
print(f"\n🎉 视频处理完成!")
print(f"📊 性能统计:")
print(f" 总处理帧数: {processed_frames}")
print(f" 总耗时: {total_time:.1f}")
print(f" 平均每帧: {avg_processing_time:.1f}ms")
print(f" 实际FPS: {processed_frames / total_time:.1f}")
if output_path:
print(f" 输出视频: {output_path}")
def process_webcam(self, camera_id: int = 0, output_path: str = None):
"""
处理摄像头实时视频流
"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"❌ 无法打开摄像头 {camera_id}")
return
# 设置摄像头分辨率(可选)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 设置输出视频
if output_path:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
else:
out = None
print(f"🎥 开始摄像头实时识别 - 模式: {self.list_mode} (按 'q' 退出)...")
print(f"📋 注册人脸数: {len(self.registered_faces)}")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 无法读取摄像头帧")
break
# 处理当前帧
processed_frame, results = self.process_frame(frame)
# 添加实时信息
current_fps = 1000 / self.processing_times[-1] if self.processing_times else 0
info_text = f"FPS: {current_fps:.1f} | Faces: {len(results)} | Mode: {self.list_mode}"
cv2.putText(processed_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 添加名单统计
match_count = sum(1 for r in results if r['is_match'])
list_text = f"Match: {match_count}/{len(results)}"
cv2.putText(processed_frame, list_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 写入输出
if out:
out.write(processed_frame)
# 显示预览
cv2.imshow('Real-time Face Recognition', processed_frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
if out:
out.release()
cv2.destroyAllWindows()
print("✅ 摄像头处理结束")
# 使用示例
def main():
# 创建视频识别系统
video_system = VideoFaceRecognition(use_gpu=True)
# 设置名单模式
# video_system.set_list_mode("blacklist") # 黑名单模式
# video_system.set_list_mode("whitelist") # 白名单模式
# 加载注册人脸
register_dir = "test_data/register" # 注册图片目录
if os.path.exists(register_dir):
video_system.load_registered_faces(register_dir)
else:
print(f"⚠️ 注册目录不存在: {register_dir}")
#
# # 设置质量阈值(可根据实际情况调整)
# video_system.set_quality_thresholds(
# clarity_threshold=1000.0, # 清晰度阈值
# quality_threshold=0.6, # 质量得分阈值
# min_face_size=30
# )
# # 选择处理模式
# print("请选择处理模式:")
# print("1. 处理视频文件")
# print("2. 实时摄像头")
#
# choice = input("请输入选择 (1 或 2): ").strip()
choice = "1"
if choice == "1":
# 处理视频文件
video_path = "test_data/video/video_2.mp4"
output_path = "test_data/output_video/video_2_white_7_gpu.mp4"
# output_path = "test_data/output_video/video_2_black_2.mp4"
# 性能优化:跳帧处理
skip_frames = 2
video_system.process_video_file(
video_path=video_path,
output_path=output_path,
skip_frames=skip_frames,
show_preview=False
)
elif choice == "2":
# 实时摄像头
output_path = "webcam_recording.mp4"
video_system.process_webcam(
camera_id=0,
output_path=output_path
)
else:
print("❌ 无效选择")
if __name__ == "__main__":
main()