post消息放入子线程队列(未测试)
This commit is contained in:
@@ -7,6 +7,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
import queue
|
import queue
|
||||||
import requests
|
import requests
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from common.contants import ALERT_PUSH_URL
|
from common.contants import ALERT_PUSH_URL
|
||||||
|
|
||||||
# -------------------------- Kadian 检测相关导入 --------------------------
|
# -------------------------- Kadian 检测相关导入 --------------------------
|
||||||
@@ -813,7 +814,7 @@ class KadianDetector:
|
|||||||
|
|
||||||
# ========================= 帧处理线程 =========================
|
# ========================= 帧处理线程 =========================
|
||||||
class FrameProcessorWorker(threading.Thread):
|
class FrameProcessorWorker(threading.Thread):
|
||||||
def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None):
|
def __init__(self, raw_queue: queue.Queue, ws_queue: queue.Queue, stop_event: threading.Event, cameras=None, post_workers: int = 4):
|
||||||
super().__init__(daemon=True)
|
super().__init__(daemon=True)
|
||||||
self.raw_queue = raw_queue
|
self.raw_queue = raw_queue
|
||||||
self.ws_queue = ws_queue
|
self.ws_queue = ws_queue
|
||||||
@@ -828,6 +829,20 @@ class FrameProcessorWorker(threading.Thread):
|
|||||||
|
|
||||||
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
|
self.last_alert_push_time: Dict[int, Dict[str, float]] = {}
|
||||||
|
|
||||||
|
# 线程池用于异步发送 POST 请求
|
||||||
|
self.post_executor = ThreadPoolExecutor(max_workers=post_workers, thread_name_prefix="alert_post")
|
||||||
|
|
||||||
|
def _post_alert(self, msg: dict):
|
||||||
|
"""异步发送告警 POST 请求(在线程池中执行)"""
|
||||||
|
try:
|
||||||
|
response = requests.post(ALERT_PUSH_URL, json=msg, timeout=5.0)
|
||||||
|
if response.status_code == 200:
|
||||||
|
print(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}")
|
||||||
|
else:
|
||||||
|
print(f"[WARN] POST alert failed with status: {response.status_code}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] POST alert request failed: {e}")
|
||||||
|
|
||||||
def _encode_base64(self, img):
|
def _encode_base64(self, img):
|
||||||
_, buf = cv2.imencode(".jpg", img)
|
_, buf = cv2.imencode(".jpg", img)
|
||||||
return base64.b64encode(buf).decode("ascii")
|
return base64.b64encode(buf).decode("ascii")
|
||||||
@@ -908,17 +923,10 @@ class FrameProcessorWorker(threading.Thread):
|
|||||||
try:
|
try:
|
||||||
self.ws_queue.put(msg, timeout=1.0)
|
self.ws_queue.put(msg, timeout=1.0)
|
||||||
if push_actions and len(push_actions) > 0:
|
if push_actions and len(push_actions) > 0:
|
||||||
# 发送POST请求
|
# 异步发送 POST 请求(提交到线程池)
|
||||||
post_msg = msg.copy()
|
post_msg = msg.copy()
|
||||||
post_msg['type'] = 1
|
post_msg['type'] = 1
|
||||||
try:
|
self.post_executor.submit(self._post_alert, post_msg)
|
||||||
response = requests.post(ALERT_PUSH_URL, json=post_msg, timeout=5.0)
|
|
||||||
if response.status_code == 200:
|
|
||||||
print(f"[INFO] POST alert sent successfully for actions: {push_actions}")
|
|
||||||
else:
|
|
||||||
print(f"[WARN] POST alert failed with status: {response.status_code}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[ERROR] POST alert request failed: {e}")
|
|
||||||
except queue.Full:
|
except queue.Full:
|
||||||
logger.warning("[WARN] ws_send_queue full, drop frame message")
|
logger.warning("[WARN] ws_send_queue full, drop frame message")
|
||||||
|
|
||||||
@@ -934,3 +942,6 @@ class FrameProcessorWorker(threading.Thread):
|
|||||||
# 继续处理下一帧,不要退出循环
|
# 继续处理下一帧,不要退出循环
|
||||||
finally:
|
finally:
|
||||||
self.raw_queue.task_done()
|
self.raw_queue.task_done()
|
||||||
|
|
||||||
|
# 线程退出时关闭线程池
|
||||||
|
self.post_executor.shutdown(wait=False)
|
||||||
|
|||||||
Reference in New Issue
Block a user