完成犯人带出检测

This commit is contained in:
zqc
2026-01-08 22:19:50 +08:00
parent 931f2ff861
commit 1b8b24ab3e
2 changed files with 164 additions and 2 deletions

View File

@@ -24,6 +24,15 @@ class VideoFacePrisonBiz(BaseFaceBiz):
face_analysis: 已初始化好的FaceAnalysis实例
"""
super().__init__(face_analysis)
# 人脸匹配跟踪配置
self.detection_window_seconds = 2.0 # 检测窗口时间(秒)
self.min_match_count = 5 # 最小匹配次数
self.cooldown_seconds = 30 # 冷却时间(秒)
# 跟踪数据结构
self.person_tracking = {} # {person_id: [timestamp1, timestamp2, ...]}
self.person_cooldown = {} # {person_id: cooldown_end_time}
def draw_detections(self, frame: np.ndarray, results: List[Dict]) -> np.ndarray:
"""
@@ -64,6 +73,120 @@ class VideoFacePrisonBiz(BaseFaceBiz):
return frame
def set_detection_window_seconds(self, window_seconds: float):
"""
设置检测窗口时间
参数:
window_seconds: 检测窗口时间(秒)
"""
self.detection_window_seconds = window_seconds
def get_detection_window_seconds(self) -> float:
"""
获取检测窗口时间
返回:
检测窗口时间(秒)
"""
return self.detection_window_seconds
def set_min_match_count(self, min_matches: int):
"""
设置最小匹配次数
参数:
min_matches: 最小匹配次数
"""
self.min_match_count = min_matches
def get_min_match_count(self) -> int:
"""
获取最小匹配次数
返回:
最小匹配次数
"""
return self.min_match_count
def set_cooldown_seconds(self, cooldown_seconds: int):
"""
设置冷却时间
参数:
cooldown_seconds: 冷却时间(秒)
"""
self.cooldown_seconds = cooldown_seconds
def get_cooldown_seconds(self) -> int:
"""
获取冷却时间
返回:
冷却时间(秒)
"""
return self.cooldown_seconds
def _cleanup_old_records(self, current_time: float):
"""
清理过期的跟踪记录
参数:
current_time: 当前时间戳
"""
# 清理过期的匹配记录
for person_id in list(self.person_tracking.keys()):
# 保留在检测窗口内的记录
self.person_tracking[person_id] = [
ts for ts in self.person_tracking[person_id]
if current_time - ts <= self.detection_window_seconds
]
# 如果记录为空删除该person_id
if not self.person_tracking[person_id]:
del self.person_tracking[person_id]
# 清理过期的冷却记录
for person_id in list(self.person_cooldown.keys()):
if current_time > self.person_cooldown[person_id]:
del self.person_cooldown[person_id]
def _is_person_passed(self, person_id: str, current_time: float) -> bool:
"""
判断人员是否已经通过
参数:
person_id: 人员标识符
current_time: 当前时间戳
返回:
是否通过
"""
# 检查是否在冷却期内
if person_id in self.person_cooldown:
if current_time <= self.person_cooldown[person_id]:
# 还在冷却期内,忽略此人
return False
else:
# 冷却期结束,删除记录
del self.person_cooldown[person_id]
# 检查是否达到最小匹配次数
if person_id in self.person_tracking:
recent_matches = [
ts for ts in self.person_tracking[person_id]
if current_time - ts <= self.detection_window_seconds
]
if len(recent_matches) >= self.min_match_count:
# 达到条件,设置冷却期
self.person_cooldown[person_id] = current_time + self.cooldown_seconds
# 清空该人员的匹配记录
del self.person_tracking[person_id]
return True
return False
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict], float]:
"""
处理单帧图像
@@ -72,6 +195,10 @@ class VideoFacePrisonBiz(BaseFaceBiz):
(原始帧, 识别结果列表, 处理时间ms)
"""
start_time = time.time()
current_time = time.time()
# 清理过期的跟踪记录
self._cleanup_old_records(current_time)
# 人脸检测和识别
faces = self.app.get(frame)
@@ -83,14 +210,25 @@ class VideoFacePrisonBiz(BaseFaceBiz):
# 查找最佳匹配
best_name, similarity = self.find_best_match(face.embedding)
is_match = best_name is not None and similarity >= self.similarity_threshold
# 新增:判断是否已经通过
has_passed = False
if is_match and best_name:
has_passed = self._is_person_passed(best_name, current_time)
# 如果匹配但未通过,记录匹配时间
if is_match and not has_passed:
if best_name not in self.person_tracking:
self.person_tracking[best_name] = []
self.person_tracking[best_name].append(current_time)
result = {
'bbox': face.bbox.astype(int).tolist(),
'similarity': similarity,
'best_match': best_name,
'is_match': is_match,
'has_passed': has_passed, # 新增:是否已经通过
'det_score': float(face.det_score),
'quality_metrics': quality_metrics,
'is_acceptable': is_acceptable