From 4d010e45e36b5b9598f3d5981630c1dae0c664c1 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Wed, 4 Mar 2026 18:17:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E5=8F=96frame=20processor=E5=88=B0bas?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biz/base_frame_processor.py | 195 +++++++++++++++++++++++++++++ biz/checkpoint/checkpoint_biz.py | 146 ++------------------- biz/prison/supervision_room_biz.py | 134 ++------------------ biz/prison/trajectory02_biz.py | 128 ++----------------- 4 files changed, 221 insertions(+), 382 deletions(-) create mode 100644 biz/base_frame_processor.py diff --git a/biz/base_frame_processor.py b/biz/base_frame_processor.py new file mode 100644 index 0000000..f03fccc --- /dev/null +++ b/biz/base_frame_processor.py @@ -0,0 +1,195 @@ +# base_frame_processor.py +# 通用帧处理工作线程基类 +# 统一异步POST、告警去重、抽帧控制等公共逻辑 + +import cv2 +import base64 +import time +import threading +import queue +import requests +from typing import Dict, Any, Callable +from concurrent.futures import ThreadPoolExecutor + +from common.contants import ALERT_PUSH_URL +from utils.logger import get_logger + +logger = get_logger(__name__) + +# 告警推送频率限制(秒) +ALERT_PUSH_INTERVAL = 5.0 + + +class BaseFrameProcessorWorker(threading.Thread): + """ + 通用帧处理工作线程基类 + + 功能: + - 统一异步POST(线程池) + - 统一告警去重逻辑 + - 统一抽帧控制 + - 统一异常处理 + + 子类仅需提供: + - DETECTOR_FACTORY: 检测器工厂函数 + - POST_TYPE: POST请求的type值 + - TARGET_FPS: 目标帧率 + """ + + # ========== 子类必须重写的类常量 ========== + DETECTOR_FACTORY: Callable = None # 检测器工厂函数 + POST_TYPE: int = 2 # POST type + TARGET_FPS: float = 10.0 # 目标帧率 + + def __init__(self, + raw_queue: queue.Queue, + ws_queue: queue.Queue, + stop_event: threading.Event, + cameras=None, + post_workers: int = 4): + super().__init__(daemon=True) + self.raw_queue = raw_queue + self.ws_queue = ws_queue + self.stop_event = stop_event + # 将摄像头列表转换为字典,key为id,方便通过camera_id快速查找 + self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {} + + self.last_ts: Dict[int, float] = {} + + # 检测器实例缓存 + self.detectors: Dict[int, Any] = {} + + # 告警去重记录 {camera_id: {action: last_push_time}} + self.last_alert_push_time: Dict[int, Dict[str, float]] = {} + + # 异步POST线程池 + self.post_executor = ThreadPoolExecutor( + max_workers=post_workers, + thread_name_prefix="alert_post" + ) + + def _encode_image_to_base64(self, img) -> str: + """图像编码为 Base64""" + ok, buf = cv2.imencode(".jpg", img) + if not ok: + raise RuntimeError("Failed to encode image to JPEG") + return base64.b64encode(buf.tobytes()).decode("ascii") + + def _post_alert(self, msg: dict): + """异步发送告警 POST 请求(在线程池中执行)""" + try: + response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0) + if response.status_code == 200: + print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}") + else: + print(f"[WARN] POST alert failed with status: {response.status_code}") + except Exception as e: + print(f"[ERROR] POST alert request failed: {e}") + + def _create_detector(self, params): + """创建检测器实例""" + # 使用 type(self) 访问类属性,避免 lambda 被绑定 self 参数 + factory = type(self).DETECTOR_FACTORY + if factory is None: + raise NotImplementedError("子类必须提供 DETECTOR_FACTORY") + return factory(params) + + def _filter_duplicate_alerts(self, cam_id: int, alerts: list, current_time: float) -> list: + """ + 过滤5秒内重复的告警 + + Args: + cam_id: 摄像头ID + alerts: 当前帧的告警列表 + current_time: 当前时间戳 + + Returns: + 符合推送条件的action列表 + """ + if cam_id not in self.last_alert_push_time: + self.last_alert_push_time[cam_id] = {} + + push_actions = [] + for alert in alerts: + action = alert['action'] + last_push = self.last_alert_push_time[cam_id].get(action, 0) + # 检查是否超过推送间隔 + if current_time - last_push >= ALERT_PUSH_INTERVAL: + push_actions.append(action) + # 更新该action的最后推送时间 + self.last_alert_push_time[cam_id][action] = current_time + + return push_actions + + def run(self): + """主循环 - 模板方法""" + target_interval = 1.0 / self.TARGET_FPS + + while not self.stop_event.is_set(): + try: + item = self.raw_queue.get(timeout=0.5) + except queue.Empty: + continue + + try: + cam_id = item["camera_id"] + ts = item["timestamp"] + frame = item["frame"] + + # 抽帧控制 + if ts - self.last_ts.get(cam_id, 0) < target_interval: + continue + self.last_ts[cam_id] = ts + + # 获取或创建检测器实例 + if cam_id not in self.detectors: + camera_config = self.cameras.get(cam_id) + params = camera_config.params if camera_config else None + self.detectors[cam_id] = self._create_detector(params) + detector = self.detectors[cam_id] + + # 执行检测 + result = detector.process_frame(frame.copy(), cam_id, ts) + result_img = result["image"] + result_alerts = result["alerts"] + + # 过滤重复告警 + push_actions = self._filter_duplicate_alerts( + cam_id, result_alerts, time.time() + ) + + # 编码图像 + try: + img_b64 = self._encode_image_to_base64(result_img) + except Exception as e: + logger.error(f"[ERROR] Encode image failed: {e}") + img_b64 = None + + # 推送结果 + if img_b64 is not None: + msg = { + "msg_type": "frame", + "camera_id": item["camera_index"], + "timestamp": ts, + "result_type": push_actions, + "image_base64": img_b64, + } + try: + self.ws_queue.put(msg, timeout=1.0) + if push_actions and len(push_actions) > 0: + # 异步发送 POST 请求(提交到线程池) + post_msg = msg.copy() + post_msg['type'] = self.POST_TYPE + self.post_executor.submit(self._post_alert, post_msg) + except queue.Full: + logger.warning("[WARN] ws_send_queue full, drop frame message") + + except Exception as e: + logger.error( + f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}") + logger.exception("Exception details:") + finally: + self.raw_queue.task_done() + + # 线程退出时关闭线程池 + self.post_executor.shutdown(wait=False) diff --git a/biz/checkpoint/checkpoint_biz.py b/biz/checkpoint/checkpoint_biz.py index b1a893c..c77818b 100644 --- a/biz/checkpoint/checkpoint_biz.py +++ b/biz/checkpoint/checkpoint_biz.py @@ -1,14 +1,11 @@ import cv2 import numpy as np -import base64 from typing import Dict, Any import threading -import time import queue -import requests -from concurrent.futures import ThreadPoolExecutor -from common.contants import ALERT_PUSH_URL + +from biz.base_frame_processor import BaseFrameProcessorWorker # -------------------------- Kadian 检测相关导入 -------------------------- from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX # 主检测模型(人/车/后备箱/手机) @@ -813,135 +810,10 @@ class KadianDetector: # ========================= 帧处理线程 ========================= -class FrameProcessorWorker(threading.Thread): - def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None, post_workers: int = 4): - super().__init__(daemon=True) - self.raw_queue = raw_queue - self.ws_queue = ws_queue - self.stop_event = stop_event - # 将摄像头列表转换为字典,key为id,方便通过camera_id快速查找 - self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {} - - self.last_ts: Dict[int, float] = {} - - # 每个摄像头一个独立的 Kadian 检测器实例 - self.kadian_detectors: Dict[int, KadianDetector] = {} - - self.last_alert_push_time: Dict[int, Dict[str, float]] = {} - - # 线程池用于异步发送 POST 请求 - self.post_executor = ThreadPoolExecutor(max_workers=post_workers, thread_name_prefix="alert_post") - - def _post_alert(self, msg: dict): - """异步发送告警 POST 请求(在线程池中执行)""" - try: - response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0) - if response.status_code == 200: - print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}") - else: - print(f"[WARN] POST alert failed with status: {response.status_code}") - except Exception as e: - print(f"[ERROR] POST alert request failed: {e}") - - def _encode_base64(self, img): - _, buf = cv2.imencode(".jpg", img) - return base64.b64encode(buf).decode("ascii") - - def run(self): - target_interval = 1.0 / RTSP_TARGET_FPS - while not self.stop_event.is_set(): - try: - item = self.raw_queue.get(timeout=0.5) - except queue.Empty: - continue - - try: - cam_id = item["camera_id"] - ts = item["timestamp"] - frame = item["frame"] - - # 抽帧控制 - if ts - self.last_ts.get(cam_id, 0) < target_interval: - # self.raw_queue.task_done() - continue - self.last_ts[cam_id] = ts - - # 获取检测器实例 - if cam_id not in self.kadian_detectors: - camera_config = self.cameras.get(cam_id) - params = camera_config.params if camera_config else None - self.kadian_detectors[cam_id] = KadianDetector(params) - detector = self.kadian_detectors[cam_id] - - # 执行检测 - # detect_start = time.time() - result = detector.process_frame(frame.copy(), cam_id, ts) - # detect_time = (time.time() - detect_start) * 1000 - - result_img = result["image"] - result_type = result["alerts"] - # logger.debug(f"alerts: {result_type}") - - # ========= 核心修改:过滤5秒内重复的action ========= - # 初始化当前摄像头的推送时间记录 - if cam_id not in self.last_alert_push_time: - self.last_alert_push_time[cam_id] = {} - - # 筛选出符合推送条件的action(5秒内未推送过) - push_actions = [] - current_time = time.time() - for alert in result_type: - action = alert['action'] - last_push = self.last_alert_push_time[cam_id].get(action, 0) - # 检查是否超过推送间隔 - if current_time - last_push >= ALERT_PUSH_INTERVAL: - push_actions.append(action) - # 更新该action的最后推送时间 - self.last_alert_push_time[cam_id][action] = current_time - - # 通过 WebSocket 发送帧结果 - # encode_start = time.time() - try: - img_b64 = self._encode_base64(result_img) - except Exception as e: - logger.error(f"[ERROR] Encode image failed: {e}") - img_b64 = None - # encode_time = (time.time() - encode_start) * 1000 - - if img_b64 is not None: - # 将abnormal_actions对象数组转换为字符串数组 - # action_names = [action_info['action'] for action_info in push_actions] - - msg = { - "msg_type": "frame", - "camera_id": item["camera_index"], - "timestamp": ts, - # "result_type": action_names, - "result_type": push_actions, - "image_base64": img_b64, - } - try: - self.ws_queue.put(msg, timeout=1.0) - if push_actions and len(push_actions) > 0: - # 异步发送 POST 请求(提交到线程池) - post_msg = msg.copy() - post_msg['type'] = 1 - self.post_executor.submit(self._post_alert, post_msg) - except queue.Full: - logger.warning("[WARN] ws_send_queue full, drop frame message") - - # # 打印关键操作的耗时 - # total_time = detect_time + encode_time - # logger.info(f"[PERF] Camera {cam_id} - Total: {total_time:.1f}ms | " - # f"Detect: {detect_time:.1f}ms | " - # f"Encode: {encode_time:.1f}ms | ") - - except Exception as e: - logger.error(f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}") - logger.exception("Exception details:") # 打印完整的堆栈跟踪 - # 继续处理下一帧,不要退出循环 - finally: - self.raw_queue.task_done() - - # 线程退出时关闭线程池 - self.post_executor.shutdown(wait=False) +class FrameProcessorWorker(BaseFrameProcessorWorker): + """卡点检测帧处理线程""" + + # 子类配置 + DETECTOR_FACTORY = lambda params: KadianDetector(params) + POST_TYPE = 1 + TARGET_FPS = RTSP_TARGET_FPS diff --git a/biz/prison/supervision_room_biz.py b/biz/prison/supervision_room_biz.py index ca93869..f504c67 100644 --- a/biz/prison/supervision_room_biz.py +++ b/biz/prison/supervision_room_biz.py @@ -5,17 +5,14 @@ import cv2 import numpy as np import os -import time import threading import queue import yaml import json -import base64 from typing import Dict, Any, Tuple, List -from common.contants import ALERT_PUSH_URL -import requests +from biz.base_frame_processor import BaseFrameProcessorWorker # -------------------------- Kadian 检测相关导入 -------------------------- from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX # 主检测模型(人/车/后备箱/手机) @@ -496,126 +493,11 @@ class ZhihuishiDetector: # ========================= 帧处理线程 ========================= -class FrameProcessorWorker(threading.Thread): - def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None): - super().__init__(daemon=True) - self.raw_queue = raw_queue - self.ws_queue = ws_queue - self.stop_event = stop_event - self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {} - - self.last_ts: Dict[int, float] = {} - - # 每个摄像头一个独立的 Kadian 检测器实例 - self.kadian_detectors: Dict[int, ZhihuishiDetector] = {} - - self.last_alert_push_time: Dict[int, Dict[str, float]] = {} - - def _encode_base64(self, img): - _, buf = cv2.imencode(".jpg", img) - return base64.b64encode(buf).decode("ascii") - - def run(self): - target_interval = 1.0 / RTSP_TARGET_FPS - while not self.stop_event.is_set(): - try: - item = self.raw_queue.get(timeout=0.5) - except queue.Empty: - continue - - try: - cam_id = item["camera_id"] - ts = item["timestamp"] - frame = item["frame"] - - # 抽帧控制 - if ts - self.last_ts.get(cam_id, 0) < target_interval: - # self.raw_queue.task_done() - continue - self.last_ts[cam_id] = ts - - # 获取检测器实例 - if cam_id not in self.kadian_detectors: - camera_config = self.cameras.get(cam_id) - params = camera_config.params if camera_config else None - self.kadian_detectors[cam_id] = ZhihuishiDetector(params) - detector = self.kadian_detectors[cam_id] - - # 执行检测 - # detect_start = time.time() - result = detector.process_frame(frame.copy(), cam_id, ts) - # detect_time = (time.time() - detect_start) * 1000 - - result_img = result["image"] - result_type = result["alerts"] - # logger.debug(f"alerts: {result_type}") - - # ========= 核心修改:过滤5秒内重复的action ========= - # 初始化当前摄像头的推送时间记录 - if cam_id not in self.last_alert_push_time: - self.last_alert_push_time[cam_id] = {} - - # 筛选出符合推送条件的action(5秒内未推送过) - push_actions = [] - current_time = time.time() - for alert in result_type: - action = alert['action'] - last_push = self.last_alert_push_time[cam_id].get(action, 0) - # 检查是否超过推送间隔 - if current_time - last_push >= ALERT_PUSH_INTERVAL: - push_actions.append(action) - # 更新该action的最后推送时间 - self.last_alert_push_time[cam_id][action] = current_time - - # 通过 WebSocket 发送帧结果 - # encode_start = time.time() - try: - img_b64 = self._encode_base64(result_img) - except Exception as e: - logger.error(f"[ERROR] Encode image failed: {e}") - img_b64 = None - # encode_time = (time.time() - encode_start) * 1000 - - if img_b64 is not None: - # 将abnormal_actions对象数组转换为字符串数组 - # action_names = [action_info['action'] for action_info in push_actions] - - msg = { - "msg_type": "frame", - "camera_id": item["camera_index"], - "timestamp": ts, - # "result_type": action_names, - "result_type": push_actions, - "image_base64": img_b64, - } - try: - self.ws_queue.put(msg, timeout=1.0) - if push_actions and len(push_actions) > 0: - # 发送POST请求 - post_msg = msg.copy() - post_msg['type'] = 2 - try: - response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0) - if response.status_code == 200: - print(f"[INFO] POST alert sent successfully for actions: {push_actions}") - else: - print(f"[WARN] POST alert failed with status: {response.status_code}") - except Exception as e: - print(f"[ERROR] POST alert request failed: {e}") - except queue.Full: - logger.warning("[WARN] ws_send_queue full, drop frame message") - - # # 打印关键操作的耗时 - # total_time = detect_time + encode_time - # logger.info(f"[PERF] Camera {cam_id} - Total: {total_time:.1f}ms | " - # f"Detect: {detect_time:.1f}ms | " - # f"Encode: {encode_time:.1f}ms | ") - - except Exception as e: - logger.error( - f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}") - logger.exception("Exception details:") # 打印完整的堆栈跟踪 - # 继续处理下一帧,不要退出循环 - finally: - self.raw_queue.task_done() +class FrameProcessorWorker(BaseFrameProcessorWorker): + """监控室检测帧处理线程""" + + # 子类配置 + DETECTOR_FACTORY = lambda params: ZhihuishiDetector(params) + POST_TYPE = 2 + TARGET_FPS = RTSP_TARGET_FPS diff --git a/biz/prison/trajectory02_biz.py b/biz/prison/trajectory02_biz.py index 6762450..bc9fb0a 100644 --- a/biz/prison/trajectory02_biz.py +++ b/biz/prison/trajectory02_biz.py @@ -6,19 +6,17 @@ import cv2 import numpy as np import os -import time import threading import queue import yaml import json -import base64 import asyncio import websockets from dataclasses import dataclass from typing import Dict, Any, Tuple, List from datetime import datetime -from common.contants import ALERT_PUSH_URL -import requests + +from biz.base_frame_processor import BaseFrameProcessorWorker # -------------------------- Kadian 检测相关导入 -------------------------- @@ -480,118 +478,10 @@ class TrajectoryDetector: # ========================= 帧处理线程 ========================= -class FrameProcessorWorker(threading.Thread): - def __init__(self, - raw_frame_queue: "queue.Queue[Dict[str, Any]]", - ws_send_queue: "queue.Queue[Dict[str, Any]]", - stop_event: threading.Event, - cameras=None): - super().__init__(daemon=True) - self.raw_queue = raw_frame_queue - self.ws_queue = ws_send_queue - self.stop_event = stop_event - # 将摄像头列表转换为字典,key为id,方便通过camera_id快速查找 - self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {} - - self.last_ts: Dict[int, float] = {} - - # 每个摄像头一个独立的 Kadian 检测器实例 - self.trajectory_detectors: Dict[int, TrajectoryDetector] = {} - - # 新增:维护每个摄像头每个action的最后推送时间 {camera_id: {action: last_push_time}} - self.last_alert_push_time: Dict[int, Dict[str, float]] = {} - - def _encode_image_to_base64(self, image) -> str: - ok, buf = cv2.imencode(".jpg", image) - if not ok: - raise RuntimeError("Failed to encode image to JPEG") - return base64.b64encode(buf.tobytes()).decode("ascii") - - def run(self): - target_interval = 1.0 / RTSP_TARGET_FPS - while not self.stop_event.is_set(): - try: - item = self.raw_queue.get(timeout=0.5) - except queue.Empty: - continue - - try: - cam_id = item["camera_id"] - ts = item["timestamp"] - frame = item["frame"] - - # 抽帧控制 - if ts - self.last_ts.get(cam_id, 0) < target_interval: - # self.raw_queue.task_done() - continue - self.last_ts[cam_id] = ts - - # 获取检测器实例 - if cam_id not in self.trajectory_detectors: - camera_config = self.cameras.get(cam_id) - params = camera_config.params if camera_config else None - self.trajectory_detectors[cam_id] = TrajectoryDetector(params) - detector = self.trajectory_detectors[cam_id] - - # 执行检测 - result = detector.process_frame(frame.copy(), cam_id, ts) - - result_img = result["image"] - result_type = result["alerts"] - - # ========= 核心修改:过滤5秒内重复的action ========= - # 初始化当前摄像头的推送时间记录 - if cam_id not in self.last_alert_push_time: - self.last_alert_push_time[cam_id] = {} - - # 筛选出符合推送条件的action(5秒内未推送过) - push_actions = [] - current_time = time.time() - for alert in result_type: - action = alert['action'] - last_push = self.last_alert_push_time[cam_id].get(action, 0) - # 检查是否超过推送间隔 - if current_time - last_push >= ALERT_PUSH_INTERVAL: - push_actions.append(action) - # 更新该action的最后推送时间 - self.last_alert_push_time[cam_id][action] = current_time - - # 通过 WebSocket 发送帧结果 - try: - img_b64 = self._encode_image_to_base64(result_img) - except Exception as e: - logger.error(f"[ERROR] Encode image failed: {e}") - img_b64 = None - - if img_b64 is not None: - msg = { - "msg_type": "frame", - "camera_id": item["camera_index"], - "timestamp": ts, - "result_type": push_actions, - "image_base64": img_b64, - } - try: - self.ws_queue.put(msg, timeout=1.0) - if push_actions and len(push_actions) > 0: - # 发送POST请求 - post_msg = msg.copy() - post_msg['type'] = 2 - try: - response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0) - if response.status_code == 200: - print(f"[INFO] POST alert sent successfully for actions: {push_actions}") - else: - print(f"[WARN] POST alert failed with status: {response.status_code}") - except Exception as e: - print(f"[ERROR] POST alert request failed: {e}") - except queue.Full: - logger.warning("[WARN] ws_send_queue full, drop frame message") - - except Exception as e: - logger.error( - f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}") - logger.exception("Exception details:") # 打印完整的堆栈跟踪 - # 继续处理下一帧,不要退出循环 - finally: - self.raw_queue.task_done() +class FrameProcessorWorker(BaseFrameProcessorWorker): + """轨迹检测帧处理线程""" + + # 子类配置 + DETECTOR_FACTORY = lambda params: TrajectoryDetector(params) + POST_TYPE = 2 + TARGET_FPS = RTSP_TARGET_FPS