This commit is contained in:
2026-03-09 17:55:06 +08:00

View File

@@ -86,31 +86,50 @@ class BaseFrameProcessorWorker(threading.Thread):
raise RuntimeError("Failed to encode image to JPEG") raise RuntimeError("Failed to encode image to JPEG")
return base64.b64encode(buf.tobytes()).decode("ascii") return base64.b64encode(buf.tobytes()).decode("ascii")
def _expand_msg_by_result_type(self, msg: dict) -> list: def _expand_msg_by_ori_alert(self, msg: dict) -> list:
""" """
将 msg 中的 result_type 从数组展开为多个独立的 msg 将 msg 中的 ori_alert 数组展开为多个独立的 msg
Args: Args:
msg: 原始消息,result_type 为 action code 字符串数组 msg: 原始消息,包含 ori_alert 数组
original_image_b64: 原始图像的 base64 编码(作为后备)
Returns: Returns:
msg 列表,每个 msg 的 result_type 为包含 action_code 和 action_name 的对象 msg 列表,每个 msg 的 result_type 为包含 action_code 和 action_name 的对象
""" """
result_types = msg.get("result_type", []) ori_alerts = msg.get("ori_alert", [])
if not isinstance(result_types, list):
# 如果 result_type 已经是单个值,直接返回
return [msg]
if not result_types: # 如果没有 ori_alert 或为空,直接返回原消息
return [msg] if not ori_alerts:
new_msg = msg.copy()
new_msg.pop("ori_alert", None)
return [new_msg]
result = [] result = []
for action_code in result_types: for alert_item in ori_alerts:
action_code = alert_item.get("action")
if not action_code:
continue
new_msg = msg.copy() new_msg = msg.copy()
# 处理 image优先使用 ori_alert 中的 image否则使用原来的
alert_image = alert_item.get("image")
if alert_image is not None:
try:
new_msg["image_base64"] = self._encode_image_to_base64(alert_image)
except Exception as e:
logger.warning(f"[WARN] Failed to encode alert image: {e}, using original")
# 设置 result_type
new_msg["result_type"] = { new_msg["result_type"] = {
"action_code": action_code, "action_code": action_code,
"action_name": get_alert_label(action_code) "action_name": get_alert_label(action_code)
} }
# 移除 ori_alert
new_msg.pop("ori_alert", None)
result.append(new_msg) result.append(new_msg)
return result return result
@@ -331,8 +350,8 @@ class BaseFrameProcessorWorker(threading.Thread):
if segment_path: if segment_path:
mp4_path = self._create_or_get_video_clip(segment_path, segment_duration) mp4_path = self._create_or_get_video_clip(segment_path, segment_duration)
# 展开 result_type # 展开 ori_alert
expanded_msgs = self._expand_msg_by_result_type(msg) expanded_msgs = self._expand_msg_by_ori_alert(msg)
# 发送每个展开后的消息 # 发送每个展开后的消息
for expanded_msg in expanded_msgs: for expanded_msg in expanded_msgs:
@@ -424,31 +443,35 @@ class BaseFrameProcessorWorker(threading.Thread):
"camera_id": item["camera_index"], "camera_id": item["camera_index"],
"timestamp": ts, "timestamp": ts,
"result_type": push_actions, "result_type": push_actions,
"image_base64": img_b64, "image_base64": img_b64
} }
# 先推送到ws队列
try: try:
self.ws_queue.put(msg, timeout=1.0) self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 构建消息
post_msg = msg.copy()
post_msg['type'] = self.POST_TYPE
#备用backup
#self.post_executor.submit(self._post_alert, post_msg)
# 获取视频相关信息仅HLS模式有
segment_path = item.get("segment_path")
segment_duration = item.get("segment_duration")
# 提交到线程池执行包含视频剪辑和POST
self.post_executor.submit(
self._process_alert_with_video,
post_msg,
segment_path,
segment_duration
)
except queue.Full: except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message") logger.warning("[WARN] ws_send_queue full, drop frame message")
# 如果有告警需要POST即使ws队列满了也要发送
if push_actions and len(push_actions) > 0:
# 构建消息
post_msg = msg.copy()
post_msg['type'] = self.POST_TYPE
post_msg['ori_alert'] = result_alerts
#备用backup
#self.post_executor.submit(self._post_alert, post_msg)
# 获取视频相关信息仅HLS模式有
segment_path = item.get("segment_path")
segment_duration = item.get("segment_duration")
# 提交到线程池执行包含视频剪辑和POST
self.post_executor.submit(
self._process_alert_with_video,
post_msg,
segment_path,
segment_duration
)
except Exception as e: except Exception as e:
logger.error( logger.error(