Files
SupervisorAI/utils/web_socket_sender.py
2026-03-03 11:11:55 +08:00

107 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import asyncio
import websockets
import threading
import queue
from utils.logger import get_logger
logger = get_logger(__name__)
# ========================= WebSocket 服务线程 =========================
class WebSocketSender(threading.Thread):
def __init__(self, send_queue: queue.Queue, stop_event: threading.Event, ws_host: str, ws_port: int):
super().__init__(daemon=True)
self.send_queue = send_queue
self.stop_event = stop_event
self.ws_clients = set()
self.ws_host = ws_host
self.ws_port = ws_port
async def _ws_handler(self, websocket):
self.ws_clients.add(websocket)
try:
async for _ in websocket:
pass
finally:
self.ws_clients.discard(websocket)
def _get_latest_msg(self):
"""
从队列获取最新消息,丢弃旧消息
返回: (最新消息, 丢弃数量)
"""
msg = None
dropped = 0
try:
# 获取第一条消息
msg = self.send_queue.get_nowait()
dropped = 0
# 尝试获取更新的消息,丢弃旧的
while True:
try:
msg = self.send_queue.get_nowait()
dropped += 1
except queue.Empty:
break
if dropped > 0:
self.send_queue.task_done() # 为第一条标记完成
except queue.Empty:
pass
return msg, dropped
async def _broadcaster(self):
while not self.stop_event.is_set():
try:
# 使用短超时获取消息
msg = await asyncio.to_thread(self.send_queue.get, timeout=0.1)
except queue.Empty:
# 尝试获取最新消息(丢弃队列中的旧消息)
msg, dropped = await asyncio.to_thread(self._get_latest_msg)
if msg is None:
continue
if dropped > 0:
logger.debug(f"[DEBUG] Dropped {dropped} old frames to catch up")
# 尝试丢弃更多旧消息,只保留最新
dropped = 0
while True:
try:
msg = self.send_queue.get_nowait()
dropped += 1
except queue.Empty:
break
if dropped > 0:
logger.debug(f"[DEBUG] Dropped {dropped} queued frames, sending latest")
# 在线程池中执行JSON序列化避免阻塞事件循环
try:
data = await asyncio.to_thread(json.dumps, msg)
except Exception as e:
logger.error(f"[ERROR] JSON serialization failed: {e}")
self.send_queue.task_done()
continue
dead = []
for ws in list(self.ws_clients):
try:
await ws.send(data)
except Exception as e:
logger.debug(f"[DEBUG] WebSocket send error: {e}")
dead.append(ws)
for ws in dead:
self.ws_clients.discard(ws)
self.send_queue.task_done()
async def _run_async(self):
async with websockets.serve(self._ws_handler, self.ws_host, self.ws_port):
logger.info(f"[INFO] WebSocket server started at ws://{self.ws_host}:{self.ws_port}")
await self._broadcaster()
def run(self):
asyncio.run(self._run_async())