# main_start.py # 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py # 支持 start, stop, restart, status 命令,默认执行 start import json import yaml import base64 import subprocess import sys import os import signal import argparse import time from typing import List, Dict from common.camera_config import CameraConfig from utils.logger import get_logger logger = get_logger(__name__) # PID 文件目录 PID_DIR = "pids" def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict]: """从 YAML 文件加载服务组配置""" with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) return cfg.get("service_groups", []) def cameras_to_base64_json(cameras: List[dict]) -> str: """将摄像头配置转换为 base64 编码的 JSON 字符串""" json_str = json.dumps(cameras, ensure_ascii=False) return base64.b64encode(json_str.encode('utf-8')).decode('ascii') def get_pid_file_path(group_name: str) -> str: """获取服务组的 PID 文件路径""" return os.path.join(PID_DIR, f"{group_name}.pid") def ensure_pid_dir(): """确保 PID 目录存在""" if not os.path.exists(PID_DIR): os.makedirs(PID_DIR) def save_pid(pid: int, group_name: str): """保存进程ID到文件""" ensure_pid_dir() pid_file = get_pid_file_path(group_name) try: with open(pid_file, "w") as f: f.write(str(pid)) logger.info(f"[INFO] Saved PID {pid} to {pid_file}") except Exception as e: logger.error(f"[ERROR] Failed to save PID file: {e}") def read_pid(group_name: str): """从PID文件读取进程ID""" pid_file = get_pid_file_path(group_name) try: with open(pid_file, "r") as f: return int(f.read().strip()) except FileNotFoundError: return None except Exception as e: logger.error(f"[ERROR] Failed to read PID file: {e}") return None def is_process_running(pid: int): """检查进程是否在运行""" try: os.kill(pid, 0) return True except OSError: return False def start_rtsp_service(group: dict, script_path: str = "rtsp_service_ws_kadian.py"): """启动 RTSP 服务子进程(后台运行)""" cameras = group.get("cameras", []) ws_host = group.get("ws_host", "0.0.0.0") ws_port = group.get("ws_port", 8765) algorithm = group.get("algorithm", "") group_name = group.get("name", "default") cameras_base64 = cameras_to_base64_json(cameras) cmd = [ sys.executable, script_path, "--cameras", cameras_base64, "--ws-host", ws_host, "--ws-port", str(ws_port), "--algorithm", algorithm ] logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}") process = subprocess.Popen(cmd, start_new_session=True) return process, group_name def start_service(): """启动所有服务组""" config_path = "config.yaml" # 1. 读取配置 service_groups = load_service_groups_from_yaml(config_path) logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}") if not service_groups: logger.error("[ERROR] No service groups found in config, exiting...") return False # 2. 启动每个服务组 started_count = 0 for group in service_groups: group_name = group.get("name", "default") # 检查是否已经在运行 pid = read_pid(group_name) if pid and is_process_running(pid): logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}") continue try: process, name = start_rtsp_service(group) time.sleep(0.5) save_pid(process.pid, name) logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}") started_count += 1 except Exception as e: logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}") logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups") return started_count > 0 def status_service(): """检查所有服务组状态""" config_path = "config.yaml" service_groups = load_service_groups_from_yaml(config_path) if not service_groups: logger.info("[INFO] No service groups configured") return False running_count = 0 for group in service_groups: group_name = group.get("name", "default") pid = read_pid(group_name) if pid and is_process_running(pid): logger.info(f"[INFO] Service group '{group_name}' is running with PID {pid}") running_count += 1 else: logger.info(f"[INFO] Service group '{group_name}' is not running") # 清理无效的PID文件 if pid: try: os.remove(get_pid_file_path(group_name)) except: pass logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running") return running_count > 0 def stop_service(force=False): """停止所有服务组""" config_path = "config.yaml" service_groups = load_service_groups_from_yaml(config_path) if not service_groups: logger.info("[INFO] No service groups configured") return True stopped_count = 0 for group in service_groups: group_name = group.get("name", "default") pid = read_pid(group_name) if not pid: logger.info(f"[INFO] Service group '{group_name}' has no PID file") continue if not is_process_running(pid): logger.warning(f"[WARN] Service group '{group_name}' PID {pid} not running, cleaning up") try: os.remove(get_pid_file_path(group_name)) except: pass stopped_count += 1 continue try: if force: logger.info(f"[INFO] Force killing service group '{group_name}' (PID {pid})") os.kill(pid, signal.SIGKILL) else: logger.info(f"[INFO] Stopping service group '{group_name}' (PID {pid})") os.kill(pid, signal.SIGTERM) # 等待进程结束 for i in range(10): if not is_process_running(pid): break time.sleep(1) if is_process_running(pid): logger.warning(f"[WARN] Force killing service group '{group_name}'") os.kill(pid, signal.SIGKILL) time.sleep(1) # 清理PID文件 try: os.remove(get_pid_file_path(group_name)) except: pass logger.info(f"[INFO] Service group '{group_name}' stopped") stopped_count += 1 except Exception as e: logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}") logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups") return True def restart_service(): """重启所有服务组""" logger.info("[INFO] Restarting all service groups...") stop_service() time.sleep(2) return start_service() def main(): parser = argparse.ArgumentParser(description="RTSP Service Manager") parser.add_argument("command", nargs="?", choices=["start", "stop", "restart", "status"], default="start", help="Command to execute: start, stop, restart, status (default: start)") parser.add_argument("--force", action="store_true", help="Force stop (send SIGKILL immediately)") args = parser.parse_args() if args.command == "start": success = start_service() sys.exit(0 if success else 1) elif args.command == "stop": success = stop_service(force=args.force) sys.exit(0 if success else 1) elif args.command == "restart": success = restart_service() sys.exit(0 if success else 1) elif args.command == "status": status_service() sys.exit(0) if __name__ == "__main__": main()