增加main_start.py,通过此入口启动程序

This commit is contained in:
zqc
2026-02-26 15:29:48 +08:00
parent 267f0e3080
commit 04bd6d3e15
3 changed files with 134 additions and 7 deletions

View File

@@ -34,4 +34,9 @@ cameras:
- id: 1
index: testindexcode
name: Entrance
rtsp_url: rtsp://localhost:8554/test
params:
roi_points:
- [0.15, 0.001]
- [0.5, 0.001]
- [1.0, 0.8]
- [0.35, 1.0]

70
main_start.py Normal file
View File

@@ -0,0 +1,70 @@
# main_start.py
# 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py
import json
import yaml
import base64
import subprocess
import sys
from typing import List
from common.camera_config import CameraConfig
from utils.logger import get_logger
logger = get_logger(__name__)
def load_cameras_from_yaml(config_path: str = "config.yaml") -> List[dict]:
"""从 YAML 文件加载摄像头配置"""
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg.get("cameras", [])
def cameras_to_base64_json(cameras: List[dict]) -> str:
"""将摄像头配置转换为 base64 编码的 JSON 字符串"""
json_str = json.dumps(cameras, ensure_ascii=False)
return base64.b64encode(json_str.encode('utf-8')).decode('ascii')
def start_rtsp_service(cameras_base64: str, script_path: str = "rtsp_service_ws_kadian.py"):
"""启动 RTSP 服务子进程"""
cmd = [
sys.executable, # 当前 Python 解释器路径
script_path,
"--cameras", cameras_base64
]
logger.info(f"[INFO] Starting RTSP service with command: python {script_path} --cameras <base64_config>")
# 使用 Popen 启动子进程,保持运行
process = subprocess.Popen(cmd)
return process
if __name__ == "__main__":
config_path = "config.yaml"
# 1. 读取配置
cameras_data = load_cameras_from_yaml(config_path)
logger.info(f"[INFO] Loaded {len(cameras_data)} cameras from {config_path}")
if not cameras_data:
logger.error("[ERROR] No cameras found in config, exiting...")
sys.exit(1)
# 2. 转换为 base64 JSON
cameras_base64 = cameras_to_base64_json(cameras_data)
# 3. 启动子进程
process = start_rtsp_service(cameras_base64)
# 4. 等待子进程结束
try:
process.wait()
except KeyboardInterrupt:
logger.info("[INFO] Received interrupt, terminating subprocess...")
process.terminate()
process.wait()
logger.info("[INFO] Subprocess terminated")

View File

@@ -5,10 +5,14 @@
import cv2
import os
import sys
import json
import time
import argparse
import threading
import queue
import yaml
import base64
from dataclasses import dataclass
@@ -171,11 +175,8 @@ class RTSPCaptureWorker(threading.Thread):
# ========================= 服务主类 =========================
class RTSPService:
def __init__(self, config_path: str = "config.yaml"):
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index = c["index"], params=c.get("params"))
for c in cfg.get("cameras", [])]
def __init__(self, cameras: list[CameraConfig]):
self.cameras = cameras
self.stop_event = threading.Event()
self.raw_queue = queue.Queue(maxsize=2)
@@ -205,8 +206,59 @@ class RTSPService:
logger.info("[INFO] Service stopped")
def parse_cameras_from_json(json_str: str) -> list[CameraConfig]:
"""从 JSON 字符串解析摄像头配置(支持 base64 编码)"""
try:
# 尝试 base64 解码
try:
decoded = base64.b64decode(json_str).decode('utf-8')
cameras_data = json.loads(decoded)
except Exception:
# 如果不是 base64直接解析 JSON
cameras_data = json.loads(json_str)
return [CameraConfig(
id=c["id"],
name=c.get("name", f"cam_{c['id']}"),
index=c.get("index"),
params=c.get("params")
) for c in cameras_data]
except Exception as e:
logger.error(f"[ERROR] Failed to parse cameras JSON: {e}")
return []
def parse_cameras_from_yaml(yaml_path: str) -> list[CameraConfig]:
"""从 YAML 文件解析摄像头配置"""
with open(yaml_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return [CameraConfig(
id=c["id"],
name=c.get("name", f"cam_{c['id']}"),
index=c.get("index"),
params=c.get("params")
) for c in cfg.get("cameras", [])]
if __name__ == "__main__":
service = RTSPService("config.yaml")
parser = argparse.ArgumentParser(description="RTSP Service for Kadian Detection")
parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)")
parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file")
args = parser.parse_args()
# 优先使用命令行传入的 cameras JSON否则读取配置文件
if args.cameras:
cameras = parse_cameras_from_json(args.cameras)
logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument")
else:
cameras = parse_cameras_from_yaml(args.config)
logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}")
if not cameras:
logger.error("[ERROR] No cameras configured, exiting...")
sys.exit(1)
service = RTSPService(cameras)
service.start()
try:
while True: