引入indoor biz
This commit is contained in:
236
biz/prison/indoor_biz.py
Normal file
236
biz/prison/indoor_biz.py
Normal file
@@ -0,0 +1,236 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import time
|
||||
import requests
|
||||
from biz.base_frame_processor import BaseFrameProcessorWorker
|
||||
from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX
|
||||
from yolox.tracker.byte_tracker import BYTETracker
|
||||
|
||||
# ========================= 走廊场景专属配置 =========================
|
||||
MODEL_PATH = 'YOLO_Weight/kanshousuo.onnx' # 犯人检测onnx模型路径
|
||||
INPUT_SIZE = 640 # 模型输入尺寸
|
||||
RTSP_FPS = 10 # 视频流目标FPS
|
||||
ALERT_PUSH_INTERVAL = 5 # 相同报警5秒内仅推送1次
|
||||
ALERT_PUSH_URL = "http://123.57.151.210:10000/picenter/websocket/test/process"
|
||||
# 消失判定:中心点在ROI内消失后,持续无检测的帧数(1.0秒,可微调)
|
||||
ROI_LOST_FRAMES_THRESH = int(0.5 * RTSP_FPS)
|
||||
|
||||
# ========================= 5个ROI区域配置(相对坐标,适配任意分辨率) =========================
|
||||
# 格式:{ROI名称: [[x1,y1], [x2,y2], ...], ...} (多边形顶点,顺/逆时针均可)
|
||||
# 相对坐标:x/y 0~1(0=左/上,1=右/下),可直接根据场景调整
|
||||
ROI_CONFIG = {
|
||||
"left_door_1": [[0.195, 0.242], [0.265, 0.17], [0.3, 0.63] ,[0.248, 0.8]], # 左侧1门ROI
|
||||
"left_door_2": [[0.3, 0.1], [0.34, 0.08], [0.35, 0.43], [0.322, 0.52]], # 左侧2门 ROI
|
||||
"left_door_3": [[0.355, 0.06], [0.42, 0], [0.42, 0.18], [0.362, 0.36]], # 左侧3门ROI
|
||||
"right_door_1": [[0.735, 0.142], [0.81, 0.22], [0.78, 0.8], [0.715, 0.65]], # 右侧1门 ROI
|
||||
"right_door_2": [[0.65, 0.06], [0.7, 0.09], [0.69, 0.5], [0.65, 0.4]] # 右侧2门ROI
|
||||
}
|
||||
|
||||
# ==================================================================================
|
||||
class PrisonerDoorDetector:
|
||||
def __init__(self, params=None):
|
||||
self.params = params or {}
|
||||
# 1. 加载YOLO模型(仅提取犯人检测结果)
|
||||
self.detector = YOLOv8_ONNX(
|
||||
MODEL_PATH,
|
||||
conf_threshold=0.5, # 置信度阈值,可根据模型精度调整
|
||||
iou_threshold=0.45, # IOU阈值
|
||||
input_size=INPUT_SIZE
|
||||
)
|
||||
|
||||
# 2. 初始化ByteTracker跟踪器(适配走廊单/多犯人跟踪)
|
||||
class TrackerArgs:
|
||||
track_thresh = 0.25
|
||||
track_buffer = 20 # 减小缓冲避免跟踪漂移
|
||||
match_thresh = 0.75
|
||||
mot20 = False
|
||||
self.tracker = BYTETracker(TrackerArgs(), frame_rate=RTSP_FPS)
|
||||
|
||||
# 3. 状态变量初始化
|
||||
self.last_alert_time = 0.0 # 最后报警时间(防重复推送)
|
||||
# 犯人跟踪信息:{track_id: {'is_cx_in_roi': 中心点是否在ROI, 'lost_frames': 消失帧数, 'lost_roi': 消失的ROI名称, 'last_cxcy': 最后中心点坐标}}
|
||||
self.prisoner_track_info = {}
|
||||
self.frame_width = 0 # 帧宽度(动态获取)
|
||||
self.frame_height = 0 # 帧高度(动态获取)
|
||||
self.roi_abs_cache = {} # ROI绝对坐标缓存:{roi_name: np.int32数组}
|
||||
|
||||
def compute_iou(self, boxA, boxB):
|
||||
"""IOU计算:匹配跟踪框与犯人检测框,过滤非犯人目标"""
|
||||
xA = max(boxA[0], boxB[0])
|
||||
yA = max(boxA[1], boxB[1])
|
||||
xB = min(boxA[2], boxB[2])
|
||||
yB = min(boxA[3], boxB[3])
|
||||
interW = max(0, xB - xA)
|
||||
interH = max(0, yB - yA)
|
||||
interArea = interW * interH
|
||||
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
|
||||
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
|
||||
unionArea = boxAArea + boxBArea - interArea
|
||||
return interArea / unionArea if unionArea > 0 else 0.0
|
||||
|
||||
def _get_roi_abs(self, roi_name):
|
||||
"""相对坐标转绝对像素坐标(适配当前帧分辨率,OpenCV要求int32)"""
|
||||
if roi_name not in ROI_CONFIG:
|
||||
return None
|
||||
roi_rel = np.array(ROI_CONFIG[roi_name], dtype=np.float64)
|
||||
roi_abs = roi_rel * np.array([self.frame_width, self.frame_height])
|
||||
return roi_abs.astype(np.int32)
|
||||
|
||||
def is_cxcy_in_roi(self, cx, cy):
|
||||
"""判断犯人框**中心点(cx,cy)** 是否在任意ROI内,返回:(是否在ROI, 所在ROI名称)"""
|
||||
for roi_name, roi_abs in self.roi_abs_cache.items():
|
||||
# OpenCV点在多边形内判定:>=0 表示在内部/边上
|
||||
if cv2.pointPolygonTest(roi_abs, (cx, cy), False) >= 0:
|
||||
return (True, roi_name)
|
||||
return (False, "outside")
|
||||
|
||||
def push_alert(self, camera_id, track_id, lost_roi, last_cxcy, timestamp):
|
||||
"""报警推送:带频率限制,携带消失ROI、最后中心点坐标"""
|
||||
current_time = time.time()
|
||||
if current_time - self.last_alert_time < ALERT_PUSH_INTERVAL:
|
||||
return False
|
||||
# 构造报警信息(可根据平台要求扩展字段)
|
||||
alert_info = {
|
||||
"camera_id": camera_id,
|
||||
"alert_type": "prisoner_cx_disappear_in_roi",
|
||||
"prisoner_track_id": track_id,
|
||||
"disappear_roi": lost_roi,
|
||||
"last_cx": round(last_cxcy[0], 2),
|
||||
"last_cy": round(last_cxcy[1], 2),
|
||||
"timestamp": timestamp,
|
||||
"details": f"犯人框中心点在{lost_roi}区域内消失,触发报警"
|
||||
}
|
||||
# 推送报警请求
|
||||
try:
|
||||
requests.post(ALERT_PUSH_URL, json=alert_info, timeout=3)
|
||||
print(f"[报警成功] {alert_info}")
|
||||
self.last_alert_time = current_time
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[报警失败] 原因:{str(e)}")
|
||||
return False
|
||||
|
||||
def process_frame(self, frame, camera_id: int, timestamp: float) -> dict:
|
||||
"""
|
||||
核心帧处理:
|
||||
1. 绘制5个ROI区域 2. 检测+跟踪犯人 3. 判定中心点是否在ROI内
|
||||
4. 中心点在ROI内消失则累计帧数,达到阈值触发报警
|
||||
"""
|
||||
self.frame_height, self.frame_width = frame.shape[:2]
|
||||
current_frame_alerts = [] # 本帧报警信息
|
||||
|
||||
# ========================= 1. 初始化ROI绝对坐标并绘制5个ROI =========================
|
||||
roi_colors = { # 各ROI绘制颜色(自定义区分)
|
||||
"left_door_1": (255, 0, 0), "left_door_2": (0, 255, 0),
|
||||
"left_door_3": (0, 0, 255), "right_door_1": (255, 255, 0),
|
||||
"right_door_2": (255, 165, 0)
|
||||
}
|
||||
self.roi_abs_cache.clear()
|
||||
for roi_name, _ in ROI_CONFIG.items():
|
||||
roi_abs = self._get_roi_abs(roi_name)
|
||||
if roi_abs is None:
|
||||
continue
|
||||
self.roi_abs_cache[roi_name] = roi_abs
|
||||
# 绘制ROI多边形(闭合)+ ROI名称标签
|
||||
roi_draw = roi_abs.reshape((-1, 1, 2)) # OpenCV绘制要求形状 (n,1,2)
|
||||
cv2.polylines(frame, [roi_draw], isClosed=True, color=roi_colors[roi_name], thickness=2)
|
||||
cv2.putText(frame, roi_name, (roi_abs[0][0], roi_abs[0][1] - 5),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, roi_colors[roi_name], 2)
|
||||
|
||||
# ========================= 2. 模型推理:仅提取犯人检测框 =========================
|
||||
detect_results = self.detector(frame)
|
||||
prisoner_dets_xyxy = [] # 仅存犯人检测框 [x1,y1,x2,y2]
|
||||
dets_for_tracker = [] # 跟踪器输入 [x1,y1,x2,y2,conf]
|
||||
if detect_results:
|
||||
for det in detect_results:
|
||||
x1, y1, x2, y2, conf, cls_id = det
|
||||
dets_for_tracker.append([x1, y1, x2, y2, conf])
|
||||
# 替换为你模型中「犯人」的实际类别ID,此处默认cls_id=1
|
||||
if cls_id == 1:
|
||||
prisoner_dets_xyxy.append([x1, y1, x2, y2])
|
||||
|
||||
# ========================= 3. 目标跟踪:更新犯人跟踪结果 =========================
|
||||
dets_np = np.array(dets_for_tracker, dtype=np.float32) if dets_for_tracker else np.empty((0, 5))
|
||||
track_results = self.tracker.update(dets_np, [self.frame_height, self.frame_width],
|
||||
[self.frame_height, self.frame_width])
|
||||
|
||||
# ========================= 4. 遍历跟踪结果:判定犯人中心点是否在ROI =========================
|
||||
current_prisoner_tids = set() # 本帧存在的犯人track_id
|
||||
for track in track_results:
|
||||
track_id = track.track_id
|
||||
track_box = list(map(float, track.tlbr)) # 跟踪框 [x1,y1,x2,y2]
|
||||
# IOU匹配:过滤非犯人目标,仅保留真正的犯人
|
||||
is_prisoner = False
|
||||
for p_box in prisoner_dets_xyxy:
|
||||
if self.compute_iou(track_box, p_box) > 0.3:
|
||||
is_prisoner = True
|
||||
break
|
||||
if not is_prisoner:
|
||||
continue
|
||||
|
||||
# 计算犯人框**中心点坐标**(核心判定依据)
|
||||
cx = (track_box[0] + track_box[2]) / 2
|
||||
cy = (track_box[1] + track_box[3]) / 2
|
||||
# 判定中心点是否在ROI内,返回(是否在ROI, 所在ROI名称)
|
||||
is_cx_in_roi, current_roi = self.is_cxcy_in_roi(cx, cy)
|
||||
# 更新犯人跟踪信息:记录中心点状态、所在ROI、最后坐标,重置消失帧数
|
||||
self.prisoner_track_info[track_id] = {
|
||||
"is_cx_in_roi": is_cx_in_roi,
|
||||
"lost_frames": 0,
|
||||
"lost_roi": current_roi,
|
||||
"last_cxcy": (cx, cy)
|
||||
}
|
||||
current_prisoner_tids.add(track_id)
|
||||
|
||||
# 绘制犯人框+中心点+状态标签(可视化调试)
|
||||
x1, y1, x2, y2 = map(int, track_box)
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # 红色犯人框
|
||||
cv2.circle(frame, (int(cx), int(cy)), 5, (0, 255, 255), -1) # 黄色中心点
|
||||
cv2.putText(frame, f"Prisoner_{track_id}({current_roi})", (x1, y1 - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
|
||||
|
||||
# ========================= 5. 核心判定:中心点在ROI内消失则报警 =========================
|
||||
for track_id in list(self.prisoner_track_info.keys()):
|
||||
if track_id not in current_prisoner_tids:
|
||||
# 犯人本帧消失,获取其最后状态
|
||||
track_info = self.prisoner_track_info[track_id]
|
||||
# 仅处理「**中心点原本在ROI内**」的消失情况
|
||||
if track_info["is_cx_in_roi"]:
|
||||
track_info["lost_frames"] += 1 # 累计消失帧数
|
||||
# 消失帧数达到阈值,触发报警
|
||||
if track_info["lost_frames"] >= ROI_LOST_FRAMES_THRESH:
|
||||
self.push_alert(
|
||||
camera_id=camera_id,
|
||||
track_id=track_id,
|
||||
lost_roi=track_info["lost_roi"],
|
||||
last_cxcy=track_info["last_cxcy"],
|
||||
timestamp=timestamp
|
||||
)
|
||||
# 记录本帧报警信息
|
||||
current_frame_alerts.append({
|
||||
"time": timestamp,
|
||||
"camera_id": camera_id,
|
||||
"action": "prisoner_cx_disappear_in_door",
|
||||
"prisoner_track_id": track_id,
|
||||
"disappear_roi": track_info["lost_roi"],
|
||||
"last_cx": round(track_info["last_cxcy"][0], 2),
|
||||
"last_cy": round(track_info["last_cxcy"][1], 2)
|
||||
})
|
||||
del self.prisoner_track_info[track_id] # 报警后清除状态,避免重复触发
|
||||
else:
|
||||
del self.prisoner_track_info[track_id] # 中心点不在ROI的消失,直接清除
|
||||
|
||||
# ========================= 6. 绘制辅助信息(摄像头ID、在押犯人数) =========================
|
||||
cv2.putText(frame, f"Camera: {camera_id}", (20, self.frame_height - 20),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
||||
cv2.putText(frame, f"Prisoners: {len(current_prisoner_tids)}", (20, self.frame_height - 50),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
|
||||
|
||||
return {"image": frame, "alerts": current_frame_alerts}
|
||||
|
||||
# ========================= 帧处理线程(对接原有框架,直接复用) =========================
|
||||
class FrameProcessorWorker(BaseFrameProcessorWorker):
|
||||
"""看守所走廊犯人检测 - 5ROI+中心点消失判定"""
|
||||
DETECTOR_FACTORY = lambda params: PrisonerDoorDetector(params)
|
||||
POST_TYPE = 3 # 与原有业务区分,自定义即可
|
||||
TARGET_FPS = RTSP_FPS
|
||||
Reference in New Issue
Block a user