diff --git a/biz/base_frame_processor.py b/biz/base_frame_processor.py index c8a3a59..1fdef68 100644 --- a/biz/base_frame_processor.py +++ b/biz/base_frame_processor.py @@ -75,6 +75,32 @@ class BaseFrameProcessorWorker(threading.Thread): raise RuntimeError("Failed to encode image to JPEG") return base64.b64encode(buf.tobytes()).decode("ascii") + def _expand_msg_by_result_type(self, msg: dict) -> list: + """ + 将 msg 中的 result_type 从数组展开为多个独立的 msg + + Args: + msg: 原始消息,result_type 为数组 + + Returns: + msg 列表,每个 msg 的 result_type 为数组中的单个元素 + """ + result_types = msg.get("result_type", []) + if not isinstance(result_types, list): + # 如果 result_type 已经是单个值,直接返回 + return [msg] + + if not result_types: + return [msg] + + result = [] + for r_type in result_types: + new_msg = msg.copy() + new_msg["result_type"] = r_type + result.append(new_msg) + + return result + def _post_alert(self, msg: dict): """异步发送告警 POST 请求(在线程池中执行)""" try: @@ -180,7 +206,10 @@ class BaseFrameProcessorWorker(threading.Thread): # 异步发送 POST 请求(提交到线程池) post_msg = msg.copy() post_msg['type'] = self.POST_TYPE - self.post_executor.submit(self._post_alert, post_msg) + # 展开 result_type 为多个独立的 msg + expanded_msgs = self._expand_msg_by_result_type(post_msg) + for expanded_msg in expanded_msgs: + self.post_executor.submit(self._post_alert, expanded_msg) except queue.Full: logger.warning("[WARN] ws_send_queue full, drop frame message")