# rtsp_service_kadian.py # 融合 Kadian_Detect_1221.py + rtsp_service_ws.py # 支持多路RTSP、抽帧、分段保存MP4、WebSocket推送图像与告警 import cv2 import numpy as np import os import time import threading import queue import yaml import json import base64 from typing import Dict, Any, Tuple, List from common.contants import ALERT_PUSH_URL import requests # -------------------------- Kadian 检测相关导入 -------------------------- from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX # 主检测模型(人/车/后备箱/手机) from yolox.tracker.byte_tracker import BYTETracker from utils.logger import get_logger logger = get_logger(__name__) # ========================= 配置区 ========================= Person_Phone_Model = r'YOLO_Weight/person_phone_model.onnx' # 人和手机的检测模型 Smoke_Model = r'YOLO_Weight/smoke_model.onnx' # 抽烟检测模型 person_phone_input_size = 1280 # 模型输入尺寸,与训练时的模型一致 smoke_input_size = 1280 # 模型输入尺寸,与训练时的模型一致 # RTSP 服务配置 RTSP_TARGET_FPS = 5.0 # 新增:告警推送频率限制(秒) ALERT_PUSH_INTERVAL = 5.0 # 相同action 5秒内仅推送一次 class ZhihuishiDetector: def __init__(self, params=None): # 模型加载 # 人和手机检测模型 print(f"加载人和手机检测模型: {Person_Phone_Model}") self.person_phone_detector = YOLOv8_ONNX(Person_Phone_Model, conf_threshold=0.6, iou_threshold=0.45, input_size=person_phone_input_size) # 抽烟检测模型 print(f"加载抽烟检测模型: {Smoke_Model}") self.smoke_detector = YOLOv8_ONNX(Smoke_Model, conf_threshold=0.4, iou_threshold=0.65, input_size=smoke_input_size) # ByteTracker class TrackerArgs: track_thresh = 0.25 track_buffer = 30 match_thresh = 0.8 mot20 = False self.fps = RTSP_TARGET_FPS self.person_phone_tracker = BYTETracker(TrackerArgs(), frame_rate=self.fps) self.smoke_tracker = BYTETracker(TrackerArgs(), frame_rate=self.fps) self.person_phone_track_role = {} self.smoke_track_role = {} # ========================================== # 超参数设置 (Hyperparameters) # ========================================== # 1. 业务判定时间阈值 self.TIME_THRESHOLD_NOBODY = 2.0 # 无人在场判定时长 self.TIME_TOLERANCE_NOBODY = 2.0 # 人丢失缓冲时间 self.TIME_THRESHOLD_SMOKE = 1.0 # 抽烟判定时长 self.TIME_TOLERANCE_SMOKE = 0.5 # 烟丢失缓冲时间(防抖动) self.TIME_THRESHOLD_PHONE = 1.0 # 玩手机判定时长 self.TIME_TOLERANCE_PHONE = 0.5 # 手机丢失缓冲时间(防抖动) # 无人在场帧数阈值 self.frame_thresh_nobody = int(self.TIME_THRESHOLD_NOBODY * self.fps) self.frame_buffer_nobody = int(self.TIME_TOLERANCE_NOBODY * self.fps) # 抽烟检测帧数阈值 self.frame_thresh_smoke = int(self.TIME_THRESHOLD_SMOKE * self.fps) self.frame_buffer_smoke = int(self.TIME_TOLERANCE_SMOKE * self.fps) # 手机检测帧数阈值 self.frame_thresh_phone = int(self.TIME_THRESHOLD_PHONE * self.fps) self.frame_buffer_phone = int(self.TIME_TOLERANCE_PHONE * self.fps) print(f"\n超参数设置:") print(f" FPS: {self.fps:.2f}") print(f" 判定 'Nobody' 需连续: {self.frame_thresh_nobody} 帧") print(f" 判定 'Smoke Detected' 需累计检测: {self.frame_thresh_smoke} 帧") print(f" 抽烟丢失缓冲帧数: {self.frame_buffer_smoke} 帧") print(f" 判定 'Phone Detected' 需累计检测: {self.frame_thresh_phone} 帧") print(f" 手机丢失缓冲帧数: {self.frame_buffer_phone} 帧") # ========================================== # 状态变量初始化 # ========================================== self.current_frame_idx = 0 # 无人在场检测状态变量 self.nobody_detection_frames = 0 self.nobody_missing_frames = 0 # 连续未检测到手机的帧数 self.nobody_alert_active = False # 手机报警是否激活 # 手机检测状态变量 self.phone_detection_frames = 0 # 连续检测到手机的帧数 self.phone_missing_frames = 0 # 连续未检测到手机的帧数 self.phone_alert_active = False # 手机报警是否激活 # 抽烟检测状态变量 self.smoke_detection_frames = 0 # 连续检测到手机的帧数 self.smoke_missing_frames = 0 # 连续未检测到手机的帧数 self.smoke_alert_active = False # 手机报警是否激活 def compute_iou(self,boxA, boxB): # box = [x1, y1, x2, y2] xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxA[3], boxB[3]) interW = max(0, xB - xA) interH = max(0, yB - yA) interArea = interW * interH boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) unionArea = boxAArea + boxBArea - interArea if unionArea == 0: return 0.0 return interArea / unionArea def draw_alert(self, frame, text, color=(0, 0, 255), sub_text=None, offset_y=0): """在右上角绘制警告文字 (支持垂直偏移,防止文字重叠)""" font_scale = 1.5 thickness = 3 font = cv2.FONT_HERSHEY_SIMPLEX (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness) x = self.width - text_w - 20 y = 50 + text_h + offset_y # 增加 Y 轴偏移 cv2.rectangle(frame, (x - 10, y - text_h - 10), (x + text_w + 10, y + 10), (0, 0, 0), -1) cv2.putText(frame, text, (x, y), font, font_scale, color, thickness) if sub_text: cv2.putText(frame, sub_text, (x, y + 40), font, 0.7, (200, 200, 200), 2) def process_frame(self, frame, camera_id: int, timestamp: float) -> Dict[str, Any]: h, w = frame.shape[:2] self.width, self.height = w, h self.current_frame_idx += 1 current_time_sec = timestamp # ========= 人和手机检测 ========= person_phone_results = self.person_phone_detector(frame) # ========= 抽烟检测 ========= smoke_results = self.smoke_detector(frame) person_phone_dets_xyxy = [] person_phone_dets_roles = [] person_phone_dets_for_tracker = [] smoke_dets_xyxy = [] smoke_dets_roles = [] smoke_dets_for_tracker = [] # ========= 当前帧所有警告列表(关键改动)========== current_frame_alerts = [] # 每帧清空,重新收集 # 收集 人和手机的检测结果 if person_phone_results: for det in person_phone_results: x1, y1, x2, y2, conf, cls_id = det # x1, y1, x2, y2为角点坐标,x1 y1为左上角,x2 y2为右下角 person_phone_dets_xyxy.append([x1, y1, x2, y2]) person_phone_dets_for_tracker.append([x1, y1, x2, y2, conf]) if cls_id == 0: person_phone_dets_roles.append("phone") elif cls_id == 1: person_phone_dets_roles.append("police") person_phone_dets = np.array(person_phone_dets_for_tracker, dtype=np.float32) if len( person_phone_dets_for_tracker) else np.empty((0, 5)) person_phone_tracks = self.person_phone_tracker.update( person_phone_dets, [self.height, self.width], [self.height, self.width] ) # 收集 抽烟的检测结果 if smoke_results: for det in smoke_results: x1, y1, x2, y2, conf, cls_id = det smoke_dets_xyxy.append([x1, y1, x2, y2]) smoke_dets_for_tracker.append([x1, y1, x2, y2, conf]) if cls_id == 0: smoke_dets_roles.append("smoke") smoke_dets = np.array(smoke_dets_for_tracker, dtype=np.float32) if len( smoke_dets_for_tracker) else np.empty((0, 5)) smoke_tracks = self.smoke_tracker.update( smoke_dets, [self.height, self.width], [self.height, self.width] ) # ========= 单帧统计变量 ========= current_person_count = 0 current_phone_count = 0 current_smoke_count = 0 # ========= 人和手机检测 ========= for t in person_phone_tracks: # print("t: {}".format(t)) tid = t.track_id # cls_id = -1 # IoU 匹配角色 # IoU匹配跟踪ID和类别 REVALIDATE_FRAME_INTERVAL = 10 if (self.current_frame_idx % REVALIDATE_FRAME_INTERVAL == 0) or (tid not in self.person_phone_track_role): #if tid not in self.person_phone_track_role: best_iou = 0 best_role = "unknown" t_box = list(map(float, t.tlbr)) # [x1,y1,x2,y2] for i, box in enumerate(person_phone_dets_xyxy): iou_val = self.compute_iou(t_box, box) if iou_val > best_iou: best_iou = iou_val best_role = person_phone_dets_roles[i] if best_iou > 0.1: self.person_phone_track_role[tid] = best_role else: self.person_phone_track_role[tid] = "unknown" role = self.person_phone_track_role.get(tid, "unknown") cls_id = -1 if role == "phone": cls_id = 0 elif role == "police": cls_id = 1 # print("tid: {}, role: {}, cls: {}".format(tid, role,cls_id)) x1, y1, x2, y2 = map(int, t.tlbr) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 color = None label = None if cls_id == 0: # Person current_phone_count += 1 color = (255, 0, 255) label = "Phone" elif cls_id == 1: # Phone(主模型已支持) current_person_count += 1 color = (0, 0, 139) label = "Person" else: color = (255, 255, 255) label = "Unknown" # label = f"ID:{tid} IN" cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # ========= 抽烟检测 ========= for t in smoke_tracks: # print("t: {}".format(t)) tid = t.track_id # cls_id = -1 # IoU 匹配角色 # IoU匹配跟踪ID和类别 REVALIDATE_FRAME_INTERVAL = 10 if (self.current_frame_idx % REVALIDATE_FRAME_INTERVAL == 0) or (tid not in self.smoke_track_role): #if tid not in self.smoke_track_role: best_iou = 0 best_role = "unknown" t_box = list(map(float, t.tlbr)) # [x1,y1,x2,y2] for i, box in enumerate(smoke_dets_xyxy): iou_val = self.compute_iou(t_box, box) if iou_val > best_iou: best_iou = iou_val best_role = smoke_dets_roles[i] # self.smoke_track_role[tid] = best_role if best_iou > 0.1: self.smoke_track_role[tid] = best_role else: self.smoke_track_role[tid] = "unknown" role = self.smoke_track_role.get(tid, "unknown") cls_id = -1 if role == "smoke": cls_id = 0 x1, y1, x2, y2 = map(int, t.tlbr) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 color = None label = None if cls_id == 0: # 抽烟 current_smoke_count += 1 color = (255, 255, 0) label = "Smoke" else: color = (255, 255, 255) label = "Unknown" # label = f"ID:{tid} IN" cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # ========================================== # 手机检测 # ========================================== if current_phone_count > 0: # 检测到手机框 self.phone_detection_frames += 1 self.phone_missing_frames = 0 # 重置丢失计数器 # 当检测累计达到阈值时,激活报警 if self.phone_detection_frames >= self.frame_thresh_phone: self.phone_alert_active = True else: # 未检测到手机框 self.phone_missing_frames += 1 # 如果之前检测到手机,重置检测计数器 if self.phone_detection_frames > 0: # 只有在连续丢失超过缓冲帧数时才重置 if self.phone_missing_frames >= self.frame_buffer_phone: self.phone_detection_frames = 0 self.phone_alert_active = False else: # 从未检测到手机,保持状态 pass # ========================================== # 抽烟检测 # ========================================== if current_smoke_count > 0: # 检测到抽烟框 self.smoke_detection_frames += 1 self.smoke_missing_frames = 0 # 重置丢失计数器 # 当检测累计达到阈值时,激活报警 if self.smoke_detection_frames >= self.frame_thresh_smoke: self.smoke_alert_active = True else: # 未检测到抽烟框 self.smoke_missing_frames += 1 # 如果之前检测到抽烟,重置检测计数器 if self.smoke_detection_frames > 0: # 只有在连续丢失超过缓冲帧数时才重置 if self.smoke_missing_frames >= self.frame_buffer_smoke: self.smoke_detection_frames = 0 self.smoke_alert_active = False else: # 从未检测到抽烟,保持状态 pass # ========================================== # 9. 业务逻辑判定 (Only One / Nobody) # ========================================== status_text = "" if current_person_count == 0: self.nobody_detection_frames += 1 self.nobody_missing_frames = 0 if self.nobody_detection_frames >= self.frame_thresh_nobody: self.nobody_alert_active = True else: self.nobody_missing_frames += 1 if self.nobody_detection_frames > 0: if self.nobody_missing_frames >= self.frame_buffer_nobody: self.nobody_detection_frames = 0 self.nobody_alert_active = False else: pass # if current_person_count == 0: # self.cnt_frame_nobody += 1 # else: # self.cnt_frame_nobody = 0 # ========================================== # 10. 收集并生成结构化警告(核心改动) # ========================================== alert_offset = 0 # A. Playing Phone if self.phone_alert_active: duration_seconds = self.phone_detection_frames / self.fps current_frame_alerts.append( { 'time': current_time_sec, 'action': 'Playing Phone', 'confidence': 1.0, # 固定为1.0(规则判定) 'details': f"Detected for {duration_seconds:.1f}s" } ) # A. Playing Phone if self.smoke_alert_active: duration_seconds = self.smoke_detection_frames / self.fps current_frame_alerts.append( { 'time': current_time_sec, 'action': 'Smoke', 'confidence': 1.0, # 固定为1.0(规则判定) 'details': f"Detected for {duration_seconds:.1f}s" } ) # D. Nobody Checking if self.nobody_alert_active: duration_seconds = self.nobody_detection_frames / self.fps current_frame_alerts.append({ 'time': current_time_sec, 'action': 'Nobody Checking', 'confidence': 1.0, 'details': f"Detected for {duration_seconds:.1f}s" }) # ========================================== # 11. 统一显示当前帧所有警告(可替换原分层显示) # ========================================== debug_info = f"Person: {current_person_count} | Phone: {current_phone_count} | Smoke: {current_smoke_count}" cv2.putText(frame, debug_info, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # 统一警告显示区 alert_y_start = 150 for i, alert in enumerate(current_frame_alerts): action = alert['action'] details = alert.get('details', '') color = (0, 0, 255) # 默认红色警告 if action == 'Nobody Checking': color = (255, 255, 255) elif action == 'Smoke': color = (0, 0, 255) elif action == 'Playing Phone': color = (255, 0, 0) main_text = action if details: main_text += f" ({details})" y_pos = alert_y_start + i * 50 cv2.rectangle(frame, (20, y_pos - 40), (900, y_pos + 10), (0, 0, 0), -1) cv2.putText(frame, main_text, (30, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2) return { "image": frame, "alerts":current_frame_alerts } # ========================= 帧处理线程 ========================= class FrameProcessorWorker(threading.Thread): def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None): super().__init__(daemon=True) self.raw_queue = raw_queue self.ws_queue = ws_queue self.stop_event = stop_event self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {} self.last_ts: Dict[int, float] = {} # 每个摄像头一个独立的 Kadian 检测器实例 self.kadian_detectors: Dict[int, ZhihuishiDetector] = {} self.last_alert_push_time: Dict[int, Dict[str, float]] = {} def _encode_base64(self, img): _, buf = cv2.imencode(".jpg", img) return base64.b64encode(buf).decode("ascii") def run(self): target_interval = 1.0 / RTSP_TARGET_FPS while not self.stop_event.is_set(): try: item = self.raw_queue.get(timeout=0.5) except queue.Empty: continue try: cam_id = item["camera_id"] ts = item["timestamp"] frame = item["frame"] # 抽帧控制 if ts - self.last_ts.get(cam_id, 0) < target_interval: # self.raw_queue.task_done() continue self.last_ts[cam_id] = ts # 获取检测器实例 if cam_id not in self.kadian_detectors: camera_config = self.cameras.get(cam_id) params = camera_config.params if camera_config else None self.kadian_detectors[cam_id] = ZhihuishiDetector(params) detector = self.kadian_detectors[cam_id] # 执行检测 # detect_start = time.time() result = detector.process_frame(frame.copy(), cam_id, ts) # detect_time = (time.time() - detect_start) * 1000 result_img = result["image"] result_type = result["alerts"] # logger.debug(f"alerts: {result_type}") # ========= 核心修改:过滤5秒内重复的action ========= # 初始化当前摄像头的推送时间记录 if cam_id not in self.last_alert_push_time: self.last_alert_push_time[cam_id] = {} # 筛选出符合推送条件的action(5秒内未推送过) push_actions = [] current_time = time.time() for alert in result_type: action = alert['action'] last_push = self.last_alert_push_time[cam_id].get(action, 0) # 检查是否超过推送间隔 if current_time - last_push >= ALERT_PUSH_INTERVAL: push_actions.append(action) # 更新该action的最后推送时间 self.last_alert_push_time[cam_id][action] = current_time # 通过 WebSocket 发送帧结果 # encode_start = time.time() try: img_b64 = self._encode_base64(result_img) except Exception as e: logger.error(f"[ERROR] Encode image failed: {e}") img_b64 = None # encode_time = (time.time() - encode_start) * 1000 if img_b64 is not None: # 将abnormal_actions对象数组转换为字符串数组 # action_names = [action_info['action'] for action_info in push_actions] msg = { "msg_type": "frame", "camera_id": item["camera_index"], "timestamp": ts, # "result_type": action_names, "result_type": push_actions, "image_base64": img_b64, } try: self.ws_queue.put(msg, timeout=1.0) if push_actions and len(push_actions) > 0: # 发送POST请求 post_msg = msg.copy() post_msg['type'] = 2 try: response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0) if response.status_code == 200: print(f"[INFO] POST alert sent successfully for actions: {push_actions}") else: print(f"[WARN] POST alert failed with status: {response.status_code}") except Exception as e: print(f"[ERROR] POST alert request failed: {e}") except queue.Full: logger.warning("[WARN] ws_send_queue full, drop frame message") # # 打印关键操作的耗时 # total_time = detect_time + encode_time # logger.info(f"[PERF] Camera {cam_id} - Total: {total_time:.1f}ms | " # f"Detect: {detect_time:.1f}ms | " # f"Encode: {encode_time:.1f}ms | ") except Exception as e: logger.error( f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}") logger.exception("Exception details:") # 打印完整的堆栈跟踪 # 继续处理下一帧,不要退出循环 finally: self.raw_queue.task_done()