将camera_config提取到单独文件

This commit is contained in:
zqc
2026-02-26 14:03:48 +08:00
parent 010b76b8a1
commit 967f783ab6
2 changed files with 11 additions and 14 deletions

View File

@@ -12,7 +12,9 @@ import yaml
from dataclasses import dataclass
from biz.checkpoint.checkpoint_biz import FrameProcessorWorker
from common.camera_config import CameraConfig
from test_cam import get_camera_preview_url
from utils.web_socket_sender import WebSocketSender
@@ -23,19 +25,6 @@ WS_HOST = "0.0.0.0"
WS_PORT = 8765
# ========================= 数据结构 =========================
@dataclass
class CameraConfig:
id: int
name: str
index: str
rtsp_url: str
params: dict = None # 额外参数字典,可选
# ========================= RTSP 抓流线程 =========================
class RTSPCaptureWorker(threading.Thread):
def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event):
@@ -185,7 +174,7 @@ class RTSPService:
def __init__(self, config_path: str = "config.yaml"):
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index = c["index"], rtsp_url=c["rtsp_url"], params=c.get("params"))
self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index = c["index"], params=c.get("params"))
for c in cfg.get("cameras", [])]
self.stop_event = threading.Event()