Files
SupervisorAI/biz/checkpoint/checkpoint_biz.py
2026-03-04 16:20:31 +08:00

948 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import base64
from typing import Dict, Any
import threading
import time
import queue
import requests
from concurrent.futures import ThreadPoolExecutor
from common.contants import ALERT_PUSH_URL
# -------------------------- Kadian 检测相关导入 --------------------------
from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX # 主检测模型(人/车/后备箱/手机)
from algorithm.common.npu_yolo_pose_onnx import YOLOv8_Pose_ONNX # Pose 专用模型
from yolox.tracker.byte_tracker import BYTETracker
from utils.logger import get_logger
logger = get_logger(__name__)
# ========================= 配置区 =========================
# Kadian 模型路径与ROI可根据实际情况修改
DETECT_MODEL_PATH = 'YOLO_Weight/car_opentrunk_person_phone.onnx'
POSE_MODEL_PATH = 'YOLO_Weight/yolov8l-pose.onnx'
# 默认相对ROI与原文件一致
#ROI_RELATIVE = np.array([
# [0.10989583333333333, 0.006481481481481481],
# [0.421875, 0.005555555555555556],
# [0.9921875, 0.9888888888888889],
# [0.3411458333333333, 0.9861111111111112]
#])
ROI_RELATIVE=np.array([
[0.15,0.001],
[0.5,0.001],
[1.0,0.8],
[0.35,1.0]
])
ALERT_PUSH_INTERVAL = 5.0
# 输入尺寸
PERSON_CAR_INPUT_SIZE = 640
POSE_INPUT_SIZE = 640
RTSP_TARGET_FPS = 10.0
# ========================= Kadian TrafficMonitor精简版专为服务设计 =========================
class KadianDetector:
def __init__(self, params=None):
# 摄像头额外参数
self.params = params if params is not None else {}
# 模型加载
self.detector = YOLOv8_ONNX(DETECT_MODEL_PATH, conf_threshold=0.15, iou_threshold=0.65,
input_size=PERSON_CAR_INPUT_SIZE)
self.pose_detector = YOLOv8_Pose_ONNX(POSE_MODEL_PATH, conf_threshold=0.45, iou_threshold=0.6,
input_size=POSE_INPUT_SIZE)
# Tracker
class TrackerArgs:
track_thresh = 0.2
track_buffer = 60
match_thresh = 0.9
mot20 = True
self.tracker = BYTETracker(TrackerArgs(), frame_rate=RTSP_TARGET_FPS)
self.track_role = {}
self.fps = RTSP_TARGET_FPS
# ROI 处理:优先从 params 获取,否则使用默认值 ROI_RELATIVE
roi_points = self.params.get('roi_points', ROI_RELATIVE)
self.roi_points = np.array(roi_points, dtype=np.float64) if roi_points is not None else None
# ==========================================
# 超参数设置 (Hyperparameters)
# ==========================================
# 1. 业务判定时间阈值
self.TIME_THRESHOLD_ONLY_ONE = 10.0 # 单人单检判定时长
self.TIME_THRESHOLD_NOBODY = 10.0 # 无人检查判定时长
# 后备箱检查判定阈值
self.TIME_THRESHOLD_TRUNK_OPEN = 0.1
# 新增:手机检测判定阈值
self.TIME_THRESHOLD_PHONE = 3.0 # 手机检测持续1秒30帧 @30fps
self.TIME_TOLERANCE_PHONE = 1.5 # 手机丢失缓冲时间(防抖动)
# 新增:制服检测判定阈值
self.TIME_THRESHOLD_UNIFORM = 2.0 # 制服不合规判定时长
self.TIME_TOLERANCE_UNIFORM = 1.0 # 制服合规恢复缓冲时间
# 2. Person 丢帧缓冲
self.TIME_TOLERANCE_PERSON = 3.0
# 车辆最小停留时间阈值 (小于此时间视为无人检查/直接通过)
self.TIME_THRESHOLD_CAR_MIN_DURATION = 10.0
# 3. Car 丢帧/ID维持缓冲
self.TIME_TOLERANCE_CAR = 10.0
# 4 OnlyOne 丢帧缓冲
self.TIME_TOLERANCE_ONLY_ONE_DURATION = 3.0
# --- 计算对应的帧数阈值 ---
self.frame_thresh_one = int(self.TIME_THRESHOLD_ONLY_ONE * self.fps)
self.frame_thresh_nobody = int(self.TIME_THRESHOLD_NOBODY * self.fps)
self.frame_thresh_trunk_valid = int(self.TIME_THRESHOLD_TRUNK_OPEN * self.fps)
# 新增:手机检测帧数阈值
self.frame_thresh_phone = int(self.TIME_THRESHOLD_PHONE * self.fps)
self.frame_buffer_phone = int(self.TIME_TOLERANCE_PHONE * self.fps)
# 新增:制服检测帧数阈值
self.frame_thresh_uniform = int(self.TIME_THRESHOLD_UNIFORM * self.fps)
self.frame_buffer_uniform = int(self.TIME_TOLERANCE_UNIFORM * self.fps)
self.frame_thresh_car_min_duration = int(self.TIME_THRESHOLD_CAR_MIN_DURATION * self.fps)
self.frame_buffer_limit_person = int(self.TIME_TOLERANCE_PERSON * self.fps)
self.frame_buffer_limit_car = int(self.TIME_TOLERANCE_CAR * self.fps)
self.frame_buffer_limit_onlyOne = int(self.TIME_TOLERANCE_ONLY_ONE_DURATION * self.fps)
logger.info(f"\n超参数设置:")
logger.info(f" FPS: {self.fps:.2f}")
logger.info(f" 判定 'Only One' / 'Nobody' 需连续: {self.frame_thresh_one}")
logger.info(f" 判定 'Trunk Checked' 需累计检测: {self.frame_thresh_trunk_valid}")
logger.info(f" 判定 'Phone Detected' 需累计检测: {self.frame_thresh_phone}")
logger.info(f" 手机丢失缓冲帧数: {self.frame_buffer_phone}")
logger.info(f" 判定 'Uniform Invalid' 需连续检测: {self.frame_thresh_uniform}")
logger.info(f" 制服合规恢复缓冲帧数: {self.frame_buffer_uniform}")
logger.info(f" 判定 'Too Fast' 最小停留: {self.frame_thresh_car_min_duration}")
self.onlyone_counter = 0
# self.onlyone_lost_counter = 0
# self.onlyone_buffer_limit = self.frame_buffer_limit_person # 10帧1秒
self.onlyone_thresh = self.frame_thresh_one # 30帧3秒
self.nobody_counter = 0
self.nobody_present_counter = 0
self.nobody_buffer_limit = self.frame_buffer_limit_onlyOne
self.nobody_thresh = self.frame_thresh_nobody # 20帧2秒
self.current_frame_idx = 0
self.ignore_show_seconds = 0.5 # 未检测的警告显示时长
self.openTrunk_show_seconds = 0.5 # 打开后备箱的警告显示时长
# 手机检测状态变量(独立于车辆)
self.phone_detection_frames = 0 # 连续检测到手机的帧数
self.phone_missing_frames = 0 # 连续未检测到手机的帧数
self.phone_alert_active = False # 手机报警是否激活
# 新增:制服检测状态变量
self.pose_person_count = 0 # 骨骼点模型检测的ROI内人员数量
self.uniform_alert_active = False # 制服报警是否激活
self.uniform_detection_frames = 0 # 连续检测到制服不合规的帧数
self.uniform_recovery_frames = 0 # 连续恢复合规的帧数
# 车辆注册表 (字典)
self.roi_car_registry = {}
# 违规车辆记录 (后备箱未检)
self.unchecked_trunk_alerts = {}
# 违规车辆记录 (通过过快 -> 归类为 Ignore)
self.fast_pass_alerts = {}
def _get_roi_points(self, frame_width: int, frame_height: int):
"""
每帧动态计算正确的 ROI 绝对坐标,并确保类型为 np.int32
用于 pointPolygonTest 和 polylines
"""
if self.roi_points is None:
raise ValueError("ROI points must be provided; cannot be None.")
if self.roi_points.max() <= 1.0:
# 相对坐标 → 转换为绝对
roi_abs = self.roi_points * np.array([frame_width, frame_height])
else:
# 绝对坐标,直接使用
roi_abs = self.roi_points.copy()
# 强制转为 int32关键解决 OpenCV 断言错误)
return roi_abs.astype(np.int32)
def check_point_in_roi(self, roi_points, point):
return cv2.pointPolygonTest(roi_points, point, False) >= 0
def compute_iou(self, boxA, boxB):
# box = [x1, y1, x2, y2]
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interW = max(0, xB - xA)
interH = max(0, yB - yA)
interArea = interW * interH
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
unionArea = boxAArea + boxBArea - interArea
if unionArea == 0:
return 0.0
return interArea / unionArea
def draw_alert(self, frame, text, color=(0, 0, 255), sub_text=None, offset_y=0):
"""在右上角绘制警告文字 (支持垂直偏移,防止文字重叠)"""
font_scale = 1.5
thickness = 3
font = cv2.FONT_HERSHEY_SIMPLEX
(text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
x = self.width - text_w - 20
y = 50 + text_h + offset_y # 增加 Y 轴偏移
cv2.rectangle(frame, (x - 10, y - text_h - 10), (x + text_w + 10, y + 10), (0, 0, 0), -1)
cv2.putText(frame, text, (x, y), font, font_scale, color, thickness)
if sub_text:
cv2.putText(frame, sub_text, (x, y + 40), font, 0.7, (200, 200, 200), 2)
def is_point_in_box(self, point, box):
px, py = point
x1, y1, x2, y2 = box
return x1 < px < x2 and y1 < py < y2
def is_pose_inside_detector_person(self, pose_bbox, dets_xyxy, dets_roles):
"""
判断一个 pose 人是否位于 detector 的 person 框内部(中心点匹配)
参数:
pose_bbox: [x1, y1, x2, y2]
dets_xyxy: detector 输出的所有 bbox 列表
dets_roles: 对应的类别列表(如 "person", "car"...
返回:
True -> 在某个人体框内部
False -> 不在任何人体框内部
"""
px1, py1, px2, py2 = pose_bbox
cx, cy = (px1 + px2) // 2, (py1 + py2) // 2
for box, role in zip(dets_xyxy, dets_roles):
if role != "person":
continue
dx1, dy1, dx2, dy2 = map(int, box)
# 判断中心点是否在 detector person 框内
if dx1 <= cx <= dx2 and dy1 <= cy <= dy2:
return True
return False
def count_pose_inside_detector_person(self, pose_results, dets_xyxy, dets_roles):
"""
统计有多少个pose框在detector person框内部
参数:
pose_results: pose检测结果列表每个元素为字典包含'bbox'键,值为[x1, y1, x2, y2]
dets_xyxy: detector输出的所有bbox列表
dets_roles: 对应的类别列表(如 "person", "car"...
返回:
int: 在detector person框内部的pose框数量
"""
count = 0
for pose in pose_results:
pose_bbox = pose['bbox'] # [x1, y1, x2, y2]
if self.is_pose_inside_detector_person(pose_bbox, dets_xyxy, dets_roles):
count += 1
return count
def process_frame(self, frame, camera_id: int, timestamp: float) -> Dict[str, Any]:
h, w = frame.shape[:2]
self.width, self.height = w, h
self.current_frame_idx += 1
# 性能计时开始
# total_start = time.time()
# ========= 每帧动态获取正确的 ROIint32=========
roi_points_int32 = self._get_roi_points(w, h) # shape: (4, 2), dtype: int32
roi_points_draw = roi_points_int32.reshape((-1, 1, 2)) # shape: (4, 1, 2) 用于绘制
current_time_sec = timestamp
# ========= 骨骼检测 =========
# pose_start = time.time()
# 耗时操作
pose_results = self.pose_detector(frame)
# pose_time = (time.time() - pose_start) * 1000
# ========= 主检测 =========
# detect_start = time.time()
# 耗时操作
detections = self.detector(frame)
# detect_time = (time.time() - detect_start) * 1000
dets_xyxy = []
dets_roles = []
dets_for_tracker = []
# ========= 当前帧所有警告列表(关键改动)==========
current_frame_alerts = [] # 每帧清空,重新收集
if detections:
for det in detections:
x1, y1, x2, y2, conf, cls_id = det # x1, y1, x2, y2为角点坐标x1 y1为左上角x2 y2为右下角
dets_xyxy.append([x1, y1, x2, y2])
dets_for_tracker.append([x1, y1, x2, y2, conf])
if cls_id == 0:
dets_roles.append("car")
elif cls_id == 1:
dets_roles.append("opentrunk")
elif cls_id == 2:
dets_roles.append("person")
elif cls_id == 3:
dets_roles.append("phone")
# logger.debug(f'dets_roles: {dets_roles}')
dets = np.array(dets_for_tracker, dtype=np.float32) if len(dets_for_tracker) else np.empty((0, 5))
tracks = self.tracker.update(
dets,
[self.height, self.width],
[self.height, self.width]
)
# logger.debug("tracks: {}".format(tracks))
# ========= 绘制骨骼 =========
frame = YOLOv8_Pose_ONNX.draw_keypoints(frame, pose_results)
# ========= 绘制 ROI =========
cv2.polylines(frame, [roi_points_draw], isClosed=True, color=(255, 0, 0), thickness=3)
# ========= 单帧统计变量 =========
current_roi_person_count = 0
current_roi_trunk_count = 0
current_roi_phone_count = 0
# 临时存储本帧的目标,用于后续关联分析
current_cars = [] # {'id':, 'box':}
current_trunks = [] # (cx, cy)
for t in tracks:
# logger.debug("t: {}".format(t))
tid = t.track_id
# cls_id = -1
# IoU 匹配角色
# if tid not in track_role and dets_xyxy:
REVALIDATE_FRAME_INTERVAL = 10
# if tid not in self.track_role:
if (self.current_frame_idx % REVALIDATE_FRAME_INTERVAL == 0) or (tid not in self.track_role):
best_iou = 0
best_role = "unknown"
t_box = list(map(float, t.tlbr)) # [x1,y1,x2,y2]
for i, box in enumerate(dets_xyxy):
iou_val = self.compute_iou(t_box, box)
if iou_val > best_iou:
best_iou = iou_val
best_role = dets_roles[i]
if best_iou > 0.1:
self.track_role[tid] = best_role
else:
self.track_role[tid] = "unknown"
role = self.track_role.get(tid, "unknown")
cls_id = -1
if role == "car":
cls_id = 0
elif role == "opentrunk":
cls_id = 1
elif role == "person":
cls_id = 2
elif role == "phone":
cls_id = 3
# logger.debug("tid: {}, role: {}, cls: {}".format(tid, role,cls_id))
x1, y1, x2, y2 = map(int, t.tlbr)
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
color = None
label = None
if self.check_point_in_roi(roi_points_int32, (cx, cy)):
if cls_id == 0: # Car
color = (0, 255, 0)
current_cars.append({'id': tid, 'box': [x1, y1, x2, y2]})
if tid not in self.roi_car_registry:
self.roi_car_registry[tid] = {
'first_seen': self.current_frame_idx,
'last_seen': self.current_frame_idx,
'trunk_frames': 0,
'is_checked': False,
}
else:
self.roi_car_registry[tid]['last_seen'] = self.current_frame_idx
label = f"Car:{tid} IN"
elif cls_id == 1: # Opentrunk
current_roi_trunk_count += 1
color = (255, 165, 0)
current_trunks.append((cx, cy))
label = "OpenTrunk IN"
elif cls_id == 2: # Person
current_roi_person_count += 1
color = (255, 0, 255)
label = "Person IN"
elif cls_id == 3: # Phone主模型已支持
current_roi_phone_count += 1
color = (0, 0, 139)
else:
color = (255, 255, 255)
label = "Unknown"
# label = f"ID:{tid} IN"
# 特殊显示: 如果这辆车已经合格,框变蓝色
if cls_id == 0 and tid in self.roi_car_registry and self.roi_car_registry[tid][
'is_checked']:
color = (255, 255, 0) # Cyan for checked cars
label += " (Checked)"
else:
color = (0, 0, 255)
label = "OUT"
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# ==========================================
# 4. 从骨骼点模型中统计ROI内人员数量
# ==========================================
self.pose_person_count = 0
# if pose_results[0].boxes is not None:
# pose_boxes = pose_results[0].boxes
# for box in pose_boxes:
# # 获取人体检测框的中心点
# x1, y1, x2, y2 = map(int, box.xyxy[0])
# cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
#
# # 判断中心点是否在ROI内
# if self.check_point_in_roi((cx, cy)):
# self.pose_person_count += 1
if pose_results:
for pose in pose_results:
x1, y1, x2, y2 = pose['bbox'][0], pose['bbox'][1], pose['bbox'][2], pose['bbox'][3]
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
# 判断中心点是否在ROI内
if self.check_point_in_roi(roi_points_int32, (cx, cy)):
self.pose_person_count += 1
# 统计pose框在detector person框内部的数量
pose_inside_count = self.count_pose_inside_detector_person(pose_results, dets_xyxy, dets_roles)
# ==========================================
# 5. 关联分析: 哪个后备箱属于哪辆车?
# ==========================================
for car_info in current_cars:
c_id = car_info['id'] # 车的id
c_box = car_info['box'] # 车的框
trunk_found_for_this_car = False # 开后备箱标记
for t_pt in current_trunks:
if self.is_point_in_box(t_pt, c_box): # 如果开后备箱的框在车的框内,就设置开后备箱标记为true
trunk_found_for_this_car = True
break
if trunk_found_for_this_car: # 如果当前车辆的开后备箱标记为true了,就设置开了后备箱的帧数+1,凑够了判断“开后备箱”这个动作的帧数之后,就设置该车"已检查"
self.roi_car_registry[c_id]['trunk_frames'] += 1
if self.roi_car_registry[c_id]['trunk_frames'] >= self.frame_thresh_trunk_valid:
self.roi_car_registry[c_id]['is_checked'] = True
# ==========================================
# 6. 独立的手机检测逻辑(不与车辆绑定)
# ==========================================
if current_roi_phone_count > 0:
# 检测到手机框
self.phone_detection_frames += 1
self.phone_missing_frames = 0 # 重置丢失计数器
# 当检测累计达到阈值时,激活报警
if self.phone_detection_frames >= self.frame_thresh_phone:
self.phone_alert_active = True
else:
# 未检测到手机框
self.phone_missing_frames += 1
# 如果之前检测到手机,重置检测计数器
if self.phone_detection_frames > 0:
# 只有在连续丢失超过缓冲帧数时才重置
if self.phone_missing_frames >= self.frame_buffer_phone:
self.phone_detection_frames = 0
self.phone_alert_active = False
else:
# 从未检测到手机,保持状态
pass
# ==========================================
# 7. 制服检测逻辑(比较两个模型的人员数量)
# ==========================================
# 比较骨骼点模型和业务检测模型的人员数量
uniform_invalid = False
if self.pose_person_count > current_roi_person_count:
# 骨骼点模型检测到的人员多于业务检测模型
# 说明有人没穿执勤制服
uniform_invalid = True
self.uniform_detection_frames += 1
self.uniform_recovery_frames = 0 # 重置恢复计数器
# 当连续检测不合规达到阈值时,激活报警
if self.uniform_detection_frames >= self.frame_thresh_uniform:
self.uniform_alert_active = True
else:
# 人员数量匹配或业务模型检测更多(理论上不会)
self.uniform_recovery_frames += 1
# 如果之前有不合规检测,检查是否需要关闭报警
if self.uniform_detection_frames > 0:
# 只有在连续合规超过缓冲帧数时才重置
if self.uniform_recovery_frames >= self.frame_buffer_uniform:
self.uniform_detection_frames = 0
self.uniform_alert_active = False
else:
# 从未检测到不合规,保持状态
pass
# ==========================================
# 8. 维护车辆注册表 & 生成离场报警
# ==========================================
active_car_ids = []
cars_to_remove = []
for car_id, info in self.roi_car_registry.items():
# 遍历所有车辆,如果当前帧时间-该车辆最后可见的时间得到的值大于车辆消失时间阈值的话,就把该车添加到移除列表中,否则添加到活跃列表中
last_seen = info['last_seen']
if (self.current_frame_idx - last_seen) <= self.frame_buffer_limit_car:
active_car_ids.append(car_id)
else:
cars_to_remove.append(car_id)
# 执行删除 并 检查违规
for car_id in cars_to_remove:
# 遍历所有移除列表中的车辆,
# 如果该车辆最后出现时间-最早出现时间的值小于车辆最小存在时间,则判断为ignore,
# 如果该车辆的“已检查”标记为true,则
# 最后在所有车辆列表中删除该车辆
car_info = self.roi_car_registry[car_id]
duration_frames = car_info['last_seen'] - car_info['first_seen']
# 情况1通过时间太短 -> 归类为 Ignore (Too Fast)
if duration_frames < self.frame_thresh_car_min_duration:
logger.warning(f"ALARM: Car {car_id} passed too fast -> Regarded as Ignore Checked!")
self.fast_pass_alerts[car_id] = self.current_frame_idx + int(self.ignore_show_seconds * self.fps)
# 情况2时间够长但没检查后备箱 -> Unchecked Trunk
elif not car_info['is_checked']:
logger.warning(f"ALARM: Car {car_id} left without checking trunk!")
self.unchecked_trunk_alerts[car_id] = self.current_frame_idx + int(
self.openTrunk_show_seconds * self.fps)
del self.roi_car_registry[car_id]
effective_car_count = len(active_car_ids)
# ==========================================
# 9. 业务逻辑判定 (Only One / Nobody)
# ==========================================
# status_text = ""
#
# if effective_car_count > 0:
# # --- Only One ---
# if current_roi_person_count == 1:
# self.cnt_frame_one_person += 1
# self.cnt_missing_buffer_person = 0
# self.cnt_frame_nobody = 0
#
# # --- Nobody ---
# elif current_roi_person_count == 0:
# if self.cnt_frame_one_person > 0 and self.cnt_missing_buffer_person < self.frame_buffer_limit_person:
# self.cnt_frame_one_person += 1
# self.cnt_missing_buffer_person += 1
# self.cnt_frame_nobody = 0
# status_text = f"Person Buffer ({self.cnt_missing_buffer_person}/{self.frame_buffer_limit_person})"
# else:
# self.cnt_frame_one_person = 0
# self.cnt_missing_buffer_person = 0
# self.cnt_frame_nobody += 1
# else:
# self.cnt_frame_one_person = 0
# self.cnt_missing_buffer_person = 0
# self.cnt_frame_nobody = 0
# else:
# self.cnt_frame_one_person = 0
# self.cnt_missing_buffer_person = 0
# self.cnt_frame_nobody = 0
# ==========================================
# 9. 业务逻辑判定 (Only One / Nobody) - 重构版
# ==========================================
if effective_car_count >= 0: # 只要没人就检测,不用等到来了车再检测
# ----- 定义条件 -----
onlyone_condition = (pose_inside_count == 1)
nobody_condition = (current_roi_person_count == 0 and self.pose_person_count == 0)
# ----- Onlyone 计数器更新 -----
if onlyone_condition: # 如果骨骼点和检测框都检测到了只有一个人时,onlyone+1,当onlyone累计够了之后触发报警
self.onlyone_counter += 1
# self.onlyone_lost_counter = 0
elif current_roi_person_count > 1 or self.pose_person_count > 1:
self.onlyone_counter = 0
# if self.onlyone_counter > 0:
# self.onlyone_lost_counter += 1
# if self.onlyone_lost_counter > self.onlyone_buffer_limit:
# self.onlyone_counter = 0
# self.onlyone_lost_counter = 0
# ----- Nobody 计数器更新 -----
if nobody_condition:
self.nobody_counter += 1
# self.nobody_present_counter = 0
elif current_roi_person_count > 0 or self.pose_person_count > 0:
self.nobody_counter = 0
# if self.nobody_counter > 0:
# self.nobody_present_counter += 1
# if self.nobody_present_counter > self.nobody_buffer_limit:
# self.nobody_counter = 0
# self.nobody_present_counter = 0
else:
# 无活跃车辆,清零所有计数器
self.onlyone_counter = 0
# self.onlyone_lost_counter = 0
self.nobody_counter = 0
self.nobody_present_counter = 0
# ==========================================
# 10. 显示报警 (UI分层优化)
# ==========================================
# 更新调试信息,包含所有检测状态
phone_status = f"Phone: {current_roi_phone_count}"
if self.phone_alert_active:
phone_status += " (ALERT)"
uniform_status = f"Uniform: Pose={self.pose_person_count}, Model={current_roi_person_count}"
if self.uniform_alert_active:
uniform_status += " (INVALID!)"
debug_info = f"Cars: {len(active_car_ids)} | Person: {current_roi_person_count} | Trunk: {current_roi_trunk_count} | {phone_status}"
cv2.putText(frame, debug_info, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, uniform_status, (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 使用 offset 实现报警堆叠,防止遮挡
alert_offset = 0
# 第一层:实时状态 (Real-time Status)
# ------------------------------------------------
# A. 显示 Only One
# if self.cnt_frame_one_person >= self.frame_thresh_one:
# current_frame_alerts.append(
# {
# 'time': current_time_sec,
# 'action': "Only One",
# }
# )
# self.draw_alert(frame, "Only One", (0, 255, 255), status_text, offset_y=alert_offset)
# alert_offset += 100
#
# # B. 显示 Nobody (实时状态)
# elif self.cnt_frame_nobody >= self.frame_thresh_nobody:
# current_frame_alerts.append(
# {
# 'time': current_time_sec,
# 'action': "Nobody",
# }
# )
# self.draw_alert(frame, "Nobody", (0, 0, 255), offset_y=alert_offset)
# alert_offset += 100
# A. 显示 Only One当累积帧数达到阈值时
if self.onlyone_counter >= self.onlyone_thresh:
current_frame_alerts.append({'time': current_time_sec, 'action': "Only One"})
self.draw_alert(frame, "Only One", (0, 255, 255), None, offset_y=alert_offset)
alert_offset += 100
# B. 显示 Nobody当累积帧数达到阈值时
elif self.nobody_counter >= self.nobody_thresh:
current_frame_alerts.append({'time': current_time_sec, 'action': "Nobody"})
self.draw_alert(frame, "Nobody", (0, 0, 255), None, offset_y=alert_offset)
alert_offset += 100
# C. 显示 Trunk Checked (在车辆存活期间)
for car_id in active_car_ids:
if car_id in self.roi_car_registry and self.roi_car_registry[car_id]['is_checked']:
current_frame_alerts.append(
{
'time': current_time_sec,
'action': "Trunk Checked",
}
)
self.draw_alert(frame, "Trunk Checked!!", (0, 255, 0), offset_y=alert_offset)
alert_offset += 100
break # 只显示一次
# D. 显示 Playing Phone独立检测不与车辆绑定
if self.phone_alert_active:
# 可以显示检测的持续时间
duration_seconds = self.phone_detection_frames / self.fps
# sub_text = f"Detected for {duration_seconds:.1f}s"
current_frame_alerts.append(
{
'time': current_time_sec,
'action': "Playing Phone",
}
)
self.draw_alert(frame, "Playing Phone", (255, 0, 0), None, offset_y=alert_offset)
alert_offset += 100
# # E. 新增:显示 Unvaild Uniform!!
# if self.uniform_alert_active:
# # 显示具体数量差异
# # diff = self.pose_person_count - current_roi_person_count
# #sub_text = f"Missing {diff} uniform(s)"
# current_frame_alerts.append(
# {
# 'time': current_time_sec,
# 'action': "Unvaild Uniform!!",
# }
# )
# self.draw_alert(frame, "Unvaild Uniform!!", (255, 165, 0), None, offset_y=alert_offset)
# alert_offset += 100
# 第二层:离场违规 (Post-Event Alerts)
# ------------------------------------------------
# F. 显示 Unchecked Trunk
expired_alerts = [cid for cid, end_frame in self.unchecked_trunk_alerts.items() if
self.current_frame_idx > end_frame]
for cid in expired_alerts:
del self.unchecked_trunk_alerts[cid]
if len(self.unchecked_trunk_alerts) > 0:
alert_text = f"Unchecked Trunk! (ID:{list(self.unchecked_trunk_alerts.keys())})"
current_frame_alerts.append(
{
'time': current_time_sec,
'action': "Unchecked Trunk",
}
)
self.draw_alert(frame, alert_text, (0, 0, 255), offset_y=alert_offset)
alert_offset += 100
# G. 显示 Ignore (离场结果)
expired_fast_alerts = [cid for cid, end_frame in self.fast_pass_alerts.items() if
self.current_frame_idx > end_frame]
for cid in expired_fast_alerts:
del self.fast_pass_alerts[cid]
if len(self.fast_pass_alerts) > 0:
alert_text = f"Ignore: (ID:{list(self.fast_pass_alerts.keys())})"
current_frame_alerts.append(
{
'time': current_time_sec,
'action': "Ignore",
}
)
self.draw_alert(frame, alert_text, (0, 0, 255), offset_y=alert_offset)
alert_offset += 100
# # ========= 性能统计和输出 =========
# total_time = (time.time() - total_start) * 1000
# logger.info(f"[PERF_DETAIL] Camera {camera_id} - ProcessFrame Total: {total_time:.1f}ms | "
# f"PoseDetect: {pose_time:.1f}ms | "
# f"MainDetect: {detect_time:.1f}ms | "
# )
return {
"image": frame,
"alerts": current_frame_alerts,
}
# ========================= 帧处理线程 =========================
class FrameProcessorWorker(threading.Thread):
def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None, post_workers: int = 4):
super().__init__(daemon=True)
self.raw_queue = raw_queue
self.ws_queue = ws_queue
self.stop_event = stop_event
# 将摄像头列表转换为字典key为id方便通过camera_id快速查找
self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {}
self.last_ts: Dict[int, float] = {}
# 每个摄像头一个独立的 Kadian 检测器实例
self.kadian_detectors: Dict[int, KadianDetector] = {}
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
# 线程池用于异步发送 POST 请求
self.post_executor = ThreadPoolExecutor(max_workers=post_workers, thread_name_prefix="alert_post")
def _post_alert(self, msg: dict):
"""异步发送告警 POST 请求(在线程池中执行)"""
try:
response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0)
if response.status_code == 200:
print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}")
else:
print(f"[WARN] POST alert failed with status: {response.status_code}")
except Exception as e:
print(f"[ERROR] POST alert request failed: {e}")
def _encode_base64(self, img):
_, buf = cv2.imencode(".jpg", img)
return base64.b64encode(buf).decode("ascii")
def run(self):
target_interval = 1.0 / RTSP_TARGET_FPS
while not self.stop_event.is_set():
try:
item = self.raw_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
cam_id = item["camera_id"]
ts = item["timestamp"]
frame = item["frame"]
# 抽帧控制
if ts - self.last_ts.get(cam_id, 0) < target_interval:
# self.raw_queue.task_done()
continue
self.last_ts[cam_id] = ts
# 获取检测器实例
if cam_id not in self.kadian_detectors:
camera_config = self.cameras.get(cam_id)
params = camera_config.params if camera_config else None
self.kadian_detectors[cam_id] = KadianDetector(params)
detector = self.kadian_detectors[cam_id]
# 执行检测
# detect_start = time.time()
result = detector.process_frame(frame.copy(), cam_id, ts)
# detect_time = (time.time() - detect_start) * 1000
result_img = result["image"]
result_type = result["alerts"]
# logger.debug(f"alerts: {result_type}")
# ========= 核心修改过滤5秒内重复的action =========
# 初始化当前摄像头的推送时间记录
if cam_id not in self.last_alert_push_time:
self.last_alert_push_time[cam_id] = {}
# 筛选出符合推送条件的action5秒内未推送过
push_actions = []
current_time = time.time()
for alert in result_type:
action = alert['action']
last_push = self.last_alert_push_time[cam_id].get(action, 0)
# 检查是否超过推送间隔
if current_time - last_push >= ALERT_PUSH_INTERVAL:
push_actions.append(action)
# 更新该action的最后推送时间
self.last_alert_push_time[cam_id][action] = current_time
# 通过 WebSocket 发送帧结果
# encode_start = time.time()
try:
img_b64 = self._encode_base64(result_img)
except Exception as e:
logger.error(f"[ERROR] Encode image failed: {e}")
img_b64 = None
# encode_time = (time.time() - encode_start) * 1000
if img_b64 is not None:
# 将abnormal_actions对象数组转换为字符串数组
# action_names = [action_info['action'] for action_info in push_actions]
msg = {
"msg_type": "frame",
"camera_id": item["camera_index"],
"timestamp": ts,
# "result_type": action_names,
"result_type": push_actions,
"image_base64": img_b64,
}
try:
self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 异步发送 POST 请求(提交到线程池)
post_msg = msg.copy()
post_msg['type'] = 1
self.post_executor.submit(self._post_alert, post_msg)
except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message")
# # 打印关键操作的耗时
# total_time = detect_time + encode_time
# logger.info(f"[PERF] Camera {cam_id} - Total: {total_time:.1f}ms | "
# f"Detect: {detect_time:.1f}ms | "
# f"Encode: {encode_time:.1f}ms | ")
except Exception as e:
logger.error(f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}")
logger.exception("Exception details:") # 打印完整的堆栈跟踪
# 继续处理下一帧,不要退出循环
finally:
self.raw_queue.task_done()
# 线程退出时关闭线程池
self.post_executor.shutdown(wait=False)