diff --git a/biz/checkpoint/checkpoint_biz.py b/biz/checkpoint/checkpoint_biz.py index d9619e9..aae97bc 100644 --- a/biz/checkpoint/checkpoint_biz.py +++ b/biz/checkpoint/checkpoint_biz.py @@ -7,6 +7,7 @@ import threading import time import queue import requests +from concurrent.futures import ThreadPoolExecutor from common.contants import ALERT_PUSH_URL # -------------------------- Kadian 检测相关导入 -------------------------- @@ -813,7 +814,7 @@ class KadianDetector: # ========================= 帧处理线程 ========================= class FrameProcessorWorker(threading.Thread): - def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None): + def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None, post_workers: int = 4): super().__init__(daemon=True) self.raw_queue = raw_queue self.ws_queue = ws_queue @@ -828,6 +829,20 @@ class FrameProcessorWorker(threading.Thread): self.last_alert_push_time: Dict[int, Dict[str, float]] = {} + # 线程池用于异步发送 POST 请求 + self.post_executor = ThreadPoolExecutor(max_workers=post_workers, thread_name_prefix="alert_post") + + def _post_alert(self, msg: dict): + """异步发送告警 POST 请求(在线程池中执行)""" + try: + response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0) + if response.status_code == 200: + print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}") + else: + print(f"[WARN] POST alert failed with status: {response.status_code}") + except Exception as e: + print(f"[ERROR] POST alert request failed: {e}") + def _encode_base64(self, img): _, buf = cv2.imencode(".jpg", img) return base64.b64encode(buf).decode("ascii") @@ -908,17 +923,10 @@ class FrameProcessorWorker(threading.Thread): try: self.ws_queue.put(msg, timeout=1.0) if push_actions and len(push_actions) > 0: - # 发送POST请求 + # 异步发送 POST 请求(提交到线程池) post_msg = msg.copy() post_msg['type'] = 1 - try: - response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0) - if response.status_code == 200: - print(f"[INFO] POST alert sent successfully for actions: {push_actions}") - else: - print(f"[WARN] POST alert failed with status: {response.status_code}") - except Exception as e: - print(f"[ERROR] POST alert request failed: {e}") + self.post_executor.submit(self._post_alert, post_msg) except queue.Full: logger.warning("[WARN] ws_send_queue full, drop frame message") @@ -934,3 +942,6 @@ class FrameProcessorWorker(threading.Thread): # 继续处理下一帧,不要退出循环 finally: self.raw_queue.task_done() + + # 线程退出时关闭线程池 + self.post_executor.shutdown(wait=False)