diff --git a/biz/base_frame_processor.py b/biz/base_frame_processor.py index 4a43084..c8c3826 100644 --- a/biz/base_frame_processor.py +++ b/biz/base_frame_processor.py @@ -86,31 +86,50 @@ class BaseFrameProcessorWorker(threading.Thread): raise RuntimeError("Failed to encode image to JPEG") return base64.b64encode(buf.tobytes()).decode("ascii") - def _expand_msg_by_result_type(self, msg: dict) -> list: + def _expand_msg_by_ori_alert(self, msg: dict) -> list: """ - 将 msg 中的 result_type 从数组展开为多个独立的 msg + 将 msg 中的 ori_alert 数组展开为多个独立的 msg Args: - msg: 原始消息,result_type 为 action code 字符串数组 + msg: 原始消息,包含 ori_alert 数组 + original_image_b64: 原始图像的 base64 编码(作为后备) Returns: msg 列表,每个 msg 的 result_type 为包含 action_code 和 action_name 的对象 """ - result_types = msg.get("result_type", []) - if not isinstance(result_types, list): - # 如果 result_type 已经是单个值,直接返回 - return [msg] + ori_alerts = msg.get("ori_alert", []) - if not result_types: - return [msg] + # 如果没有 ori_alert 或为空,直接返回原消息 + if not ori_alerts: + new_msg = msg.copy() + new_msg.pop("ori_alert", None) + return [new_msg] result = [] - for action_code in result_types: + for alert_item in ori_alerts: + action_code = alert_item.get("action") + if not action_code: + continue + new_msg = msg.copy() + + # 处理 image:优先使用 ori_alert 中的 image,否则使用原来的 + alert_image = alert_item.get("image") + if alert_image is not None: + try: + new_msg["image_base64"] = self._encode_image_to_base64(alert_image) + except Exception as e: + logger.warning(f"[WARN] Failed to encode alert image: {e}, using original") + + # 设置 result_type new_msg["result_type"] = { "action_code": action_code, "action_name": get_alert_label(action_code) } + + # 移除 ori_alert + new_msg.pop("ori_alert", None) + result.append(new_msg) return result @@ -331,8 +350,8 @@ class BaseFrameProcessorWorker(threading.Thread): if segment_path: mp4_path = self._create_or_get_video_clip(segment_path, segment_duration) - # 展开 result_type - expanded_msgs = self._expand_msg_by_result_type(msg) + # 展开 ori_alert + expanded_msgs = self._expand_msg_by_ori_alert(msg) # 发送每个展开后的消息 for expanded_msg in expanded_msgs: @@ -424,31 +443,35 @@ class BaseFrameProcessorWorker(threading.Thread): "camera_id": item["camera_index"], "timestamp": ts, "result_type": push_actions, - "image_base64": img_b64, + "image_base64": img_b64 } + # 先推送到ws队列 try: self.ws_queue.put(msg, timeout=1.0) - if push_actions and len(push_actions) > 0: - # 构建消息 - post_msg = msg.copy() - post_msg['type'] = self.POST_TYPE - - #备用backup - #self.post_executor.submit(self._post_alert, post_msg) - - # 获取视频相关信息(仅HLS模式有) - segment_path = item.get("segment_path") - segment_duration = item.get("segment_duration") - - # 提交到线程池执行(包含视频剪辑和POST) - self.post_executor.submit( - self._process_alert_with_video, - post_msg, - segment_path, - segment_duration - ) except queue.Full: logger.warning("[WARN] ws_send_queue full, drop frame message") + + # 如果有告警,需要POST(即使ws队列满了也要发送) + if push_actions and len(push_actions) > 0: + # 构建消息 + post_msg = msg.copy() + post_msg['type'] = self.POST_TYPE + post_msg['ori_alert'] = result_alerts + + #备用backup + #self.post_executor.submit(self._post_alert, post_msg) + + # 获取视频相关信息(仅HLS模式有) + segment_path = item.get("segment_path") + segment_duration = item.get("segment_duration") + + # 提交到线程池执行(包含视频剪辑和POST) + self.post_executor.submit( + self._process_alert_with_video, + post_msg, + segment_path, + segment_duration + ) except Exception as e: logger.error(