Files
SupervisorAI/biz/video_face_prison_biz.py

243 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频检查业务类 - RTSP专用
专门处理RTSP视频流中的人脸识别和检测
"""
import cv2
import numpy as np
from typing import Optional, List, Dict, Tuple
import time
from insightface.app import FaceAnalysis
from biz.base_face_biz import BaseFaceBiz
class VideoFacePrisonBiz(BaseFaceBiz):
"""
视频检查业务类 - RTSP专用
专门处理RTSP视频流中的人脸识别和检测
"""
def __init__(self, face_analysis: FaceAnalysis):
"""
初始化视频检查业务类
参数:
face_analysis: 已初始化好的FaceAnalysis实例
"""
super().__init__(face_analysis)
# 人脸匹配跟踪配置
self.detection_window_seconds = 2.0 # 检测窗口时间(秒)
self.min_match_count = 5 # 最小匹配次数
self.cooldown_seconds = 30 # 冷却时间(秒)
# 跟踪数据结构
self.person_tracking = {} # {person_id: [timestamp1, timestamp2, ...]}
self.person_cooldown = {} # {person_id: cooldown_end_time}
def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray:
"""
重写绘制检测结果方法
只在检测到黑名单匹配时用红色绘制人脸框
参数:
frame: 原始帧图像
results: 检测结果列表
返回:
绘制后的帧图像
"""
for result in results:
# 只在黑名单匹配时绘制
if result['is_match']:
bbox = result['bbox']
# 使用红色绘制人脸框
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
# 添加简单的匹配信息
best_match = result['best_match']
similarity = result['similarity']
# 绘制匹配信息
text = f"{best_match}: {similarity:.3f}"
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
# 绘制文本背景
cv2.rectangle(frame, (x1, y1 - text_size[1] - 5),
(x1 + text_size[0], y1), (0, 0, 0), -1)
# 绘制文本
cv2.putText(frame, text, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
return frame
def set_detection_window_seconds(self, window_seconds: float):
"""
设置检测窗口时间
参数:
window_seconds: 检测窗口时间(秒)
"""
self.detection_window_seconds = window_seconds
def get_detection_window_seconds(self) -> float:
"""
获取检测窗口时间
返回:
检测窗口时间(秒)
"""
return self.detection_window_seconds
def set_min_match_count(self, min_matches: int):
"""
设置最小匹配次数
参数:
min_matches: 最小匹配次数
"""
self.min_match_count = min_matches
def get_min_match_count(self) -> int:
"""
获取最小匹配次数
返回:
最小匹配次数
"""
return self.min_match_count
def set_cooldown_seconds(self, cooldown_seconds: int):
"""
设置冷却时间
参数:
cooldown_seconds: 冷却时间(秒)
"""
self.cooldown_seconds = cooldown_seconds
def get_cooldown_seconds(self) -> int:
"""
获取冷却时间
返回:
冷却时间(秒)
"""
return self.cooldown_seconds
def _cleanup_old_records(self, current_time: float):
"""
清理过期的跟踪记录
参数:
current_time: 当前时间戳
"""
# 清理过期的匹配记录
for person_id in list(self.person_tracking.keys()):
# 保留在检测窗口内的记录
self.person_tracking[person_id] = [
ts for ts in self.person_tracking[person_id]
if current_time - ts <= self.detection_window_seconds
]
# 如果记录为空删除该person_id
if not self.person_tracking[person_id]:
del self.person_tracking[person_id]
# 清理过期的冷却记录
for person_id in list(self.person_cooldown.keys()):
if current_time > self.person_cooldown[person_id]:
del self.person_cooldown[person_id]
def _is_person_passed(self, person_id: str, current_time: float) -> Tuple[bool, Optional[str]]:
"""
判断人员是否已经通过
参数:
person_id: 人员标识符
current_time: 当前时间戳
返回:
(是否通过, 通过的person_id)
"""
# 检查是否在冷却期内
if person_id in self.person_cooldown:
if current_time <= self.person_cooldown[person_id]:
# 还在冷却期内,忽略此人
return False, None
else:
# 冷却期结束,删除记录
del self.person_cooldown[person_id]
# 检查是否达到最小匹配次数
if person_id in self.person_tracking:
recent_matches = [
ts for ts in self.person_tracking[person_id]
if current_time - ts <= self.detection_window_seconds
]
if len(recent_matches) >= self.min_match_count:
# 达到条件,设置冷却期
self.person_cooldown[person_id] = current_time + self.cooldown_seconds
# 清空该人员的匹配记录
del self.person_tracking[person_id]
return True, person_id
return False, None
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict], float]:
"""
处理单帧图像
返回:
(原始帧, 识别结果列表, 处理时间ms)
"""
start_time = time.time()
current_time = time.time()
# 清理过期的跟踪记录
self._cleanup_old_records(current_time)
# 人脸检测和识别
faces = self.app.get(frame)
results = []
for face in faces:
# 检查人脸质量是否可接受
is_acceptable, quality_metrics = self.is_face_quality_acceptable(face, frame)
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
is_match = best_name is not None and similarity >= self.similarity_threshold
# 新增:判断是否已经通过
has_passed = False
passed_person_id = None
if is_match and best_name:
has_passed, passed_person_id = self._is_person_passed(best_name, current_time)
# 如果匹配但未通过,记录匹配时间
if is_match and not has_passed:
if best_name not in self.person_tracking:
self.person_tracking[best_name] = []
self.person_tracking[best_name].append(current_time)
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'has_passed': has_passed, # 新增:是否已经通过
'passed_person_id': passed_person_id, # 新增通过的person_id
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable
}
results.append(result)
processing_time = (time.time() - start_time) * 1000
return frame, results, processing_time