提取frame processor到base

This commit is contained in:
zqc
2026-03-04 18:17:02 +08:00
parent 005f77942f
commit 4d010e45e3
4 changed files with 221 additions and 382 deletions

195
biz/base_frame_processor.py Normal file
View File

@@ -0,0 +1,195 @@
# base_frame_processor.py
# 通用帧处理工作线程基类
# 统一异步POST、告警去重、抽帧控制等公共逻辑
import cv2
import base64
import time
import threading
import queue
import requests
from typing import Dict, Any, Callable
from concurrent.futures import ThreadPoolExecutor
from common.contants import ALERT_PUSH_URL
from utils.logger import get_logger
logger = get_logger(__name__)
# 告警推送频率限制(秒)
ALERT_PUSH_INTERVAL = 5.0
class BaseFrameProcessorWorker(threading.Thread):
"""
通用帧处理工作线程基类
功能:
- 统一异步POST线程池
- 统一告警去重逻辑
- 统一抽帧控制
- 统一异常处理
子类仅需提供:
- DETECTOR_FACTORY: 检测器工厂函数
- POST_TYPE: POST请求的type值
- TARGET_FPS: 目标帧率
"""
# ========== 子类必须重写的类常量 ==========
DETECTOR_FACTORY: Callable = None # 检测器工厂函数
POST_TYPE: int = 2 # POST type
TARGET_FPS: float = 10.0 # 目标帧率
def __init__(self,
raw_queue: queue.Queue,
ws_queue: queue.Queue,
stop_event: threading.Event,
cameras=None,
post_workers: int = 4):
super().__init__(daemon=True)
self.raw_queue = raw_queue
self.ws_queue = ws_queue
self.stop_event = stop_event
# 将摄像头列表转换为字典key为id方便通过camera_id快速查找
self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {}
self.last_ts: Dict[int, float] = {}
# 检测器实例缓存
self.detectors: Dict[int, Any] = {}
# 告警去重记录 {camera_id: {action: last_push_time}}
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
# 异步POST线程池
self.post_executor = ThreadPoolExecutor(
max_workers=post_workers,
thread_name_prefix="alert_post"
)
def _encode_image_to_base64(self, img) -> str:
"""图像编码为 Base64"""
ok, buf = cv2.imencode(".jpg", img)
if not ok:
raise RuntimeError("Failed to encode image to JPEG")
return base64.b64encode(buf.tobytes()).decode("ascii")
def _post_alert(self, msg: dict):
"""异步发送告警 POST 请求(在线程池中执行)"""
try:
response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0)
if response.status_code == 200:
print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}")
else:
print(f"[WARN] POST alert failed with status: {response.status_code}")
except Exception as e:
print(f"[ERROR] POST alert request failed: {e}")
def _create_detector(self, params):
"""创建检测器实例"""
# 使用 type(self) 访问类属性,避免 lambda 被绑定 self 参数
factory = type(self).DETECTOR_FACTORY
if factory is None:
raise NotImplementedError("子类必须提供 DETECTOR_FACTORY")
return factory(params)
def _filter_duplicate_alerts(self, cam_id: int, alerts: list, current_time: float) -> list:
"""
过滤5秒内重复的告警
Args:
cam_id: 摄像头ID
alerts: 当前帧的告警列表
current_time: 当前时间戳
Returns:
符合推送条件的action列表
"""
if cam_id not in self.last_alert_push_time:
self.last_alert_push_time[cam_id] = {}
push_actions = []
for alert in alerts:
action = alert['action']
last_push = self.last_alert_push_time[cam_id].get(action, 0)
# 检查是否超过推送间隔
if current_time - last_push >= ALERT_PUSH_INTERVAL:
push_actions.append(action)
# 更新该action的最后推送时间
self.last_alert_push_time[cam_id][action] = current_time
return push_actions
def run(self):
"""主循环 - 模板方法"""
target_interval = 1.0 / self.TARGET_FPS
while not self.stop_event.is_set():
try:
item = self.raw_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
cam_id = item["camera_id"]
ts = item["timestamp"]
frame = item["frame"]
# 抽帧控制
if ts - self.last_ts.get(cam_id, 0) < target_interval:
continue
self.last_ts[cam_id] = ts
# 获取或创建检测器实例
if cam_id not in self.detectors:
camera_config = self.cameras.get(cam_id)
params = camera_config.params if camera_config else None
self.detectors[cam_id] = self._create_detector(params)
detector = self.detectors[cam_id]
# 执行检测
result = detector.process_frame(frame.copy(), cam_id, ts)
result_img = result["image"]
result_alerts = result["alerts"]
# 过滤重复告警
push_actions = self._filter_duplicate_alerts(
cam_id, result_alerts, time.time()
)
# 编码图像
try:
img_b64 = self._encode_image_to_base64(result_img)
except Exception as e:
logger.error(f"[ERROR] Encode image failed: {e}")
img_b64 = None
# 推送结果
if img_b64 is not None:
msg = {
"msg_type": "frame",
"camera_id": item["camera_index"],
"timestamp": ts,
"result_type": push_actions,
"image_base64": img_b64,
}
try:
self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 异步发送 POST 请求(提交到线程池)
post_msg = msg.copy()
post_msg['type'] = self.POST_TYPE
self.post_executor.submit(self._post_alert, post_msg)
except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message")
except Exception as e:
logger.error(
f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}")
logger.exception("Exception details:")
finally:
self.raw_queue.task_done()
# 线程退出时关闭线程池
self.post_executor.shutdown(wait=False)

View File

@@ -1,14 +1,11 @@
import cv2
import numpy as np
import base64
from typing import Dict, Any
import threading
import time
import queue
import requests
from concurrent.futures import ThreadPoolExecutor
from common.contants import ALERT_PUSH_URL
from biz.base_frame_processor import BaseFrameProcessorWorker
# -------------------------- Kadian 检测相关导入 --------------------------
from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX # 主检测模型(人/车/后备箱/手机)
@@ -813,135 +810,10 @@ class KadianDetector:
# ========================= 帧处理线程 =========================
class FrameProcessorWorker(threading.Thread):
def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None, post_workers: int = 4):
super().__init__(daemon=True)
self.raw_queue = raw_queue
self.ws_queue = ws_queue
self.stop_event = stop_event
# 将摄像头列表转换为字典key为id方便通过camera_id快速查找
self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {}
self.last_ts: Dict[int, float] = {}
# 每个摄像头一个独立的 Kadian 检测器实例
self.kadian_detectors: Dict[int, KadianDetector] = {}
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
# 线程池用于异步发送 POST 请求
self.post_executor = ThreadPoolExecutor(max_workers=post_workers, thread_name_prefix="alert_post")
def _post_alert(self, msg: dict):
"""异步发送告警 POST 请求(在线程池中执行)"""
try:
response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0)
if response.status_code == 200:
print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}")
else:
print(f"[WARN] POST alert failed with status: {response.status_code}")
except Exception as e:
print(f"[ERROR] POST alert request failed: {e}")
def _encode_base64(self, img):
_, buf = cv2.imencode(".jpg", img)
return base64.b64encode(buf).decode("ascii")
def run(self):
target_interval = 1.0 / RTSP_TARGET_FPS
while not self.stop_event.is_set():
try:
item = self.raw_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
cam_id = item["camera_id"]
ts = item["timestamp"]
frame = item["frame"]
# 抽帧控制
if ts - self.last_ts.get(cam_id, 0) < target_interval:
# self.raw_queue.task_done()
continue
self.last_ts[cam_id] = ts
# 获取检测器实例
if cam_id not in self.kadian_detectors:
camera_config = self.cameras.get(cam_id)
params = camera_config.params if camera_config else None
self.kadian_detectors[cam_id] = KadianDetector(params)
detector = self.kadian_detectors[cam_id]
# 执行检测
# detect_start = time.time()
result = detector.process_frame(frame.copy(), cam_id, ts)
# detect_time = (time.time() - detect_start) * 1000
result_img = result["image"]
result_type = result["alerts"]
# logger.debug(f"alerts: {result_type}")
# ========= 核心修改过滤5秒内重复的action =========
# 初始化当前摄像头的推送时间记录
if cam_id not in self.last_alert_push_time:
self.last_alert_push_time[cam_id] = {}
# 筛选出符合推送条件的action5秒内未推送过
push_actions = []
current_time = time.time()
for alert in result_type:
action = alert['action']
last_push = self.last_alert_push_time[cam_id].get(action, 0)
# 检查是否超过推送间隔
if current_time - last_push >= ALERT_PUSH_INTERVAL:
push_actions.append(action)
# 更新该action的最后推送时间
self.last_alert_push_time[cam_id][action] = current_time
# 通过 WebSocket 发送帧结果
# encode_start = time.time()
try:
img_b64 = self._encode_base64(result_img)
except Exception as e:
logger.error(f"[ERROR] Encode image failed: {e}")
img_b64 = None
# encode_time = (time.time() - encode_start) * 1000
if img_b64 is not None:
# 将abnormal_actions对象数组转换为字符串数组
# action_names = [action_info['action'] for action_info in push_actions]
msg = {
"msg_type": "frame",
"camera_id": item["camera_index"],
"timestamp": ts,
# "result_type": action_names,
"result_type": push_actions,
"image_base64": img_b64,
}
try:
self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 异步发送 POST 请求(提交到线程池)
post_msg = msg.copy()
post_msg['type'] = 1
self.post_executor.submit(self._post_alert, post_msg)
except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message")
# # 打印关键操作的耗时
# total_time = detect_time + encode_time
# logger.info(f"[PERF] Camera {cam_id} - Total: {total_time:.1f}ms | "
# f"Detect: {detect_time:.1f}ms | "
# f"Encode: {encode_time:.1f}ms | ")
except Exception as e:
logger.error(f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}")
logger.exception("Exception details:") # 打印完整的堆栈跟踪
# 继续处理下一帧,不要退出循环
finally:
self.raw_queue.task_done()
# 线程退出时关闭线程池
self.post_executor.shutdown(wait=False)
class FrameProcessorWorker(BaseFrameProcessorWorker):
"""卡点检测帧处理线程"""
# 子类配置
DETECTOR_FACTORY = lambda params: KadianDetector(params)
POST_TYPE = 1
TARGET_FPS = RTSP_TARGET_FPS

View File

@@ -5,17 +5,14 @@
import cv2
import numpy as np
import os
import time
import threading
import queue
import yaml
import json
import base64
from typing import Dict, Any, Tuple, List
from common.contants import ALERT_PUSH_URL
import requests
from biz.base_frame_processor import BaseFrameProcessorWorker
# -------------------------- Kadian 检测相关导入 --------------------------
from algorithm.common.npu_yolo_onnx_person_car_phone import YOLOv8_ONNX # 主检测模型(人/车/后备箱/手机)
@@ -496,126 +493,11 @@ class ZhihuishiDetector:
# ========================= 帧处理线程 =========================
class FrameProcessorWorker(threading.Thread):
def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None):
super().__init__(daemon=True)
self.raw_queue = raw_queue
self.ws_queue = ws_queue
self.stop_event = stop_event
self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {}
self.last_ts: Dict[int, float] = {}
# 每个摄像头一个独立的 Kadian 检测器实例
self.kadian_detectors: Dict[int, ZhihuishiDetector] = {}
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
def _encode_base64(self, img):
_, buf = cv2.imencode(".jpg", img)
return base64.b64encode(buf).decode("ascii")
def run(self):
target_interval = 1.0 / RTSP_TARGET_FPS
while not self.stop_event.is_set():
try:
item = self.raw_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
cam_id = item["camera_id"]
ts = item["timestamp"]
frame = item["frame"]
# 抽帧控制
if ts - self.last_ts.get(cam_id, 0) < target_interval:
# self.raw_queue.task_done()
continue
self.last_ts[cam_id] = ts
# 获取检测器实例
if cam_id not in self.kadian_detectors:
camera_config = self.cameras.get(cam_id)
params = camera_config.params if camera_config else None
self.kadian_detectors[cam_id] = ZhihuishiDetector(params)
detector = self.kadian_detectors[cam_id]
# 执行检测
# detect_start = time.time()
result = detector.process_frame(frame.copy(), cam_id, ts)
# detect_time = (time.time() - detect_start) * 1000
result_img = result["image"]
result_type = result["alerts"]
# logger.debug(f"alerts: {result_type}")
# ========= 核心修改过滤5秒内重复的action =========
# 初始化当前摄像头的推送时间记录
if cam_id not in self.last_alert_push_time:
self.last_alert_push_time[cam_id] = {}
# 筛选出符合推送条件的action5秒内未推送过
push_actions = []
current_time = time.time()
for alert in result_type:
action = alert['action']
last_push = self.last_alert_push_time[cam_id].get(action, 0)
# 检查是否超过推送间隔
if current_time - last_push >= ALERT_PUSH_INTERVAL:
push_actions.append(action)
# 更新该action的最后推送时间
self.last_alert_push_time[cam_id][action] = current_time
# 通过 WebSocket 发送帧结果
# encode_start = time.time()
try:
img_b64 = self._encode_base64(result_img)
except Exception as e:
logger.error(f"[ERROR] Encode image failed: {e}")
img_b64 = None
# encode_time = (time.time() - encode_start) * 1000
if img_b64 is not None:
# 将abnormal_actions对象数组转换为字符串数组
# action_names = [action_info['action'] for action_info in push_actions]
msg = {
"msg_type": "frame",
"camera_id": item["camera_index"],
"timestamp": ts,
# "result_type": action_names,
"result_type": push_actions,
"image_base64": img_b64,
}
try:
self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 发送POST请求
post_msg = msg.copy()
post_msg['type'] = 2
try:
response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0)
if response.status_code == 200:
print(f"[INFO] POST alert sent successfully for actions: {push_actions}")
else:
print(f"[WARN] POST alert failed with status: {response.status_code}")
except Exception as e:
print(f"[ERROR] POST alert request failed: {e}")
except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message")
# # 打印关键操作的耗时
# total_time = detect_time + encode_time
# logger.info(f"[PERF] Camera {cam_id} - Total: {total_time:.1f}ms | "
# f"Detect: {detect_time:.1f}ms | "
# f"Encode: {encode_time:.1f}ms | ")
except Exception as e:
logger.error(
f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}")
logger.exception("Exception details:") # 打印完整的堆栈跟踪
# 继续处理下一帧,不要退出循环
finally:
self.raw_queue.task_done()
class FrameProcessorWorker(BaseFrameProcessorWorker):
"""监控室检测帧处理线程"""
# 子类配置
DETECTOR_FACTORY = lambda params: ZhihuishiDetector(params)
POST_TYPE = 2
TARGET_FPS = RTSP_TARGET_FPS

View File

@@ -6,19 +6,17 @@
import cv2
import numpy as np
import os
import time
import threading
import queue
import yaml
import json
import base64
import asyncio
import websockets
from dataclasses import dataclass
from typing import Dict, Any, Tuple, List
from datetime import datetime
from common.contants import ALERT_PUSH_URL
import requests
from biz.base_frame_processor import BaseFrameProcessorWorker
# -------------------------- Kadian 检测相关导入 --------------------------
@@ -480,118 +478,10 @@ class TrajectoryDetector:
# ========================= 帧处理线程 =========================
class FrameProcessorWorker(threading.Thread):
def __init__(self,
raw_frame_queue: "queue.Queue[Dict[str, Any]]",
ws_send_queue: "queue.Queue[Dict[str, Any]]",
stop_event: threading.Event,
cameras=None):
super().__init__(daemon=True)
self.raw_queue = raw_frame_queue
self.ws_queue = ws_send_queue
self.stop_event = stop_event
# 将摄像头列表转换为字典key为id方便通过camera_id快速查找
self.cameras = {cam.id: cam for cam in cameras} if cameras is not None else {}
self.last_ts: Dict[int, float] = {}
# 每个摄像头一个独立的 Kadian 检测器实例
self.trajectory_detectors: Dict[int, TrajectoryDetector] = {}
# 新增维护每个摄像头每个action的最后推送时间 {camera_id: {action: last_push_time}}
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
def _encode_image_to_base64(self, image) -> str:
ok, buf = cv2.imencode(".jpg", image)
if not ok:
raise RuntimeError("Failed to encode image to JPEG")
return base64.b64encode(buf.tobytes()).decode("ascii")
def run(self):
target_interval = 1.0 / RTSP_TARGET_FPS
while not self.stop_event.is_set():
try:
item = self.raw_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
cam_id = item["camera_id"]
ts = item["timestamp"]
frame = item["frame"]
# 抽帧控制
if ts - self.last_ts.get(cam_id, 0) < target_interval:
# self.raw_queue.task_done()
continue
self.last_ts[cam_id] = ts
# 获取检测器实例
if cam_id not in self.trajectory_detectors:
camera_config = self.cameras.get(cam_id)
params = camera_config.params if camera_config else None
self.trajectory_detectors[cam_id] = TrajectoryDetector(params)
detector = self.trajectory_detectors[cam_id]
# 执行检测
result = detector.process_frame(frame.copy(), cam_id, ts)
result_img = result["image"]
result_type = result["alerts"]
# ========= 核心修改过滤5秒内重复的action =========
# 初始化当前摄像头的推送时间记录
if cam_id not in self.last_alert_push_time:
self.last_alert_push_time[cam_id] = {}
# 筛选出符合推送条件的action5秒内未推送过
push_actions = []
current_time = time.time()
for alert in result_type:
action = alert['action']
last_push = self.last_alert_push_time[cam_id].get(action, 0)
# 检查是否超过推送间隔
if current_time - last_push >= ALERT_PUSH_INTERVAL:
push_actions.append(action)
# 更新该action的最后推送时间
self.last_alert_push_time[cam_id][action] = current_time
# 通过 WebSocket 发送帧结果
try:
img_b64 = self._encode_image_to_base64(result_img)
except Exception as e:
logger.error(f"[ERROR] Encode image failed: {e}")
img_b64 = None
if img_b64 is not None:
msg = {
"msg_type": "frame",
"camera_id": item["camera_index"],
"timestamp": ts,
"result_type": push_actions,
"image_base64": img_b64,
}
try:
self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 发送POST请求
post_msg = msg.copy()
post_msg['type'] = 2
try:
response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0)
if response.status_code == 200:
print(f"[INFO] POST alert sent successfully for actions: {push_actions}")
else:
print(f"[WARN] POST alert failed with status: {response.status_code}")
except Exception as e:
print(f"[ERROR] POST alert request failed: {e}")
except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message")
except Exception as e:
logger.error(
f"[ERROR] Frame processing failed for camera {cam_id if 'cam_id' in locals() else 'unknown'}: {e}")
logger.exception("Exception details:") # 打印完整的堆栈跟踪
# 继续处理下一帧,不要退出循环
finally:
self.raw_queue.task_done()
class FrameProcessorWorker(BaseFrameProcessorWorker):
"""轨迹检测帧处理线程"""
# 子类配置
DETECTOR_FACTORY = lambda params: TrajectoryDetector(params)
POST_TYPE = 2
TARGET_FPS = RTSP_TARGET_FPS