From 5bf0936da674e8d9269d43a18efa3a180a44aece Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Tue, 3 Mar 2026 11:11:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E4=BF=AE=E5=A4=8D=E7=94=BB?= =?UTF-8?q?=E9=9D=A2=E5=BB=B6=E8=BF=9F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hls_service_ws_kadian.py | 2 +- utils/web_socket_sender.py | 60 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py index 44319fa..4328047 100644 --- a/hls_service_ws_kadian.py +++ b/hls_service_ws_kadian.py @@ -331,7 +331,7 @@ class HLSKadianService: self.stop_event = threading.Event() self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小 - self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列 + self.ws_queue = queue.Queue(maxsize=10) # WebSocket队列,减小容量防止延迟累积 self.frame_processor_workers = [] self.biz_processor = get_processor(self.algorithm)(self.raw_queue, self.ws_queue, self.stop_event, self.cameras) diff --git a/utils/web_socket_sender.py b/utils/web_socket_sender.py index c59d773..a4c0a9f 100644 --- a/utils/web_socket_sender.py +++ b/utils/web_socket_sender.py @@ -1,4 +1,5 @@ + import json import asyncio import websockets @@ -26,18 +27,71 @@ class WebSocketSender(threading.Thread): finally: self.ws_clients.discard(websocket) + def _get_latest_msg(self): + """ + 从队列获取最新消息,丢弃旧消息 + 返回: (最新消息, 丢弃数量) + """ + msg = None + dropped = 0 + try: + # 获取第一条消息 + msg = self.send_queue.get_nowait() + dropped = 0 + + # 尝试获取更新的消息,丢弃旧的 + while True: + try: + msg = self.send_queue.get_nowait() + dropped += 1 + except queue.Empty: + break + + if dropped > 0: + self.send_queue.task_done() # 为第一条标记完成 + except queue.Empty: + pass + + return msg, dropped + async def _broadcaster(self): while not self.stop_event.is_set(): try: - msg = await asyncio.to_thread(self.send_queue.get, timeout=0.5) + # 使用短超时获取消息 + msg = await asyncio.to_thread(self.send_queue.get, timeout=0.1) except queue.Empty: + # 尝试获取最新消息(丢弃队列中的旧消息) + msg, dropped = await asyncio.to_thread(self._get_latest_msg) + if msg is None: + continue + if dropped > 0: + logger.debug(f"[DEBUG] Dropped {dropped} old frames to catch up") + + # 尝试丢弃更多旧消息,只保留最新 + dropped = 0 + while True: + try: + msg = self.send_queue.get_nowait() + dropped += 1 + except queue.Empty: + break + if dropped > 0: + logger.debug(f"[DEBUG] Dropped {dropped} queued frames, sending latest") + + # 在线程池中执行JSON序列化,避免阻塞事件循环 + try: + data = await asyncio.to_thread(json.dumps, msg) + except Exception as e: + logger.error(f"[ERROR] JSON serialization failed: {e}") + self.send_queue.task_done() continue - data = json.dumps(msg) + dead = [] for ws in list(self.ws_clients): try: await ws.send(data) - except: + except Exception as e: + logger.debug(f"[DEBUG] WebSocket send error: {e}") dead.append(ws) for ws in dead: self.ws_clients.discard(ws)