From dc7648a43c4d0369d840ac4b8febaf18ac82d4c3 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Tue, 3 Mar 2026 10:27:15 +0800 Subject: [PATCH] =?UTF-8?q?post=E6=B6=88=E6=81=AF=E6=94=BE=E5=85=A5?= =?UTF-8?q?=E5=AD=90=E7=BA=BF=E7=A8=8B=E9=98=9F=E5=88=97=EF=BC=88=E6=9C=AA?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biz/checkpoint/checkpoint_biz.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/biz/checkpoint/checkpoint_biz.py b/biz/checkpoint/checkpoint_biz.py index d9619e9..aae97bc 100644 --- a/biz/checkpoint/checkpoint_biz.py +++ b/biz/checkpoint/checkpoint_biz.py @@ -7,6 +7,7 @@ import threading import time import queue import requests +from concurrent.futures import ThreadPoolExecutor from common.contants import ALERT_PUSH_URL # -------------------------- Kadian 检测相关导入 -------------------------- @@ -813,7 +814,7 @@ class KadianDetector: # ========================= 帧处理线程 ========================= class FrameProcessorWorker(threading.Thread): - def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None): + def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None, post_workers: int = 4): super().__init__(daemon=True) self.raw_queue = raw_queue self.ws_queue = ws_queue @@ -828,6 +829,20 @@ class FrameProcessorWorker(threading.Thread): self.last_alert_push_time: Dict[int, Dict[str, float]] = {} + # 线程池用于异步发送 POST 请求 + self.post_executor = ThreadPoolExecutor(max_workers=post_workers, thread_name_prefix="alert_post") + + def _post_alert(self, msg: dict): + """异步发送告警 POST 请求(在线程池中执行)""" + try: + response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0) + if response.status_code == 200: + print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}") + else: + print(f"[WARN] POST alert failed with status: {response.status_code}") + except Exception as e: + print(f"[ERROR] POST alert request failed: {e}") + def _encode_base64(self, img): _, buf = cv2.imencode(".jpg", img) return base64.b64encode(buf).decode("ascii") @@ -908,17 +923,10 @@ class FrameProcessorWorker(threading.Thread): try: self.ws_queue.put(msg, timeout=1.0) if push_actions and len(push_actions) > 0: - # 发送POST请求 + # 异步发送 POST 请求(提交到线程池) post_msg = msg.copy() post_msg['type'] = 1 - try: - response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0) - if response.status_code == 200: - print(f"[INFO] POST alert sent successfully for actions: {push_actions}") - else: - print(f"[WARN] POST alert failed with status: {response.status_code}") - except Exception as e: - print(f"[ERROR] POST alert request failed: {e}") + self.post_executor.submit(self._post_alert, post_msg) except queue.Full: logger.warning("[WARN] ws_send_queue full, drop frame message") @@ -934,3 +942,6 @@ class FrameProcessorWorker(threading.Thread): # 继续处理下一帧,不要退出循环 finally: self.raw_queue.task_done() + + # 线程退出时关闭线程池 + self.post_executor.shutdown(wait=False)