# main_start.py # 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py # 支持 start, stop, restart, status 命令,默认执行 start import json import yaml import base64 import subprocess import sys import os import signal import argparse import time from typing import List from common.camera_config import CameraConfig from utils.logger import get_logger logger = get_logger(__name__) # PID 文件路径 PID_FILE = "rtsp_service.pid" def load_cameras_from_yaml(config_path: str = "config.yaml") -> List[dict]: """从 YAML 文件加载摄像头配置""" with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) return cfg.get("cameras", []) def cameras_to_base64_json(cameras: List[dict]) -> str: """将摄像头配置转换为 base64 编码的 JSON 字符串""" json_str = json.dumps(cameras, ensure_ascii=False) return base64.b64encode(json_str.encode('utf-8')).decode('ascii') def start_rtsp_service(cameras_base64: str, script_path: str = "rtsp_service_ws_kadian.py"): """启动 RTSP 服务子进程(后台运行)""" cmd = [ sys.executable, # 当前 Python 解释器路径 script_path, "--cameras", cameras_base64 ] logger.info(f"[INFO] Starting RTSP service with command: python {script_path} --cameras ") # 使用 start_new_session=True 创建新会话,类似 nohup 效果 process = subprocess.Popen(cmd, start_new_session=True) return process def save_pid(pid: int): """保存进程ID到文件""" try: with open(PID_FILE, "w") as f: f.write(str(pid)) logger.info(f"[INFO] Saved PID {pid} to {PID_FILE}") except Exception as e: logger.error(f"[ERROR] Failed to save PID file: {e}") def read_pid(): """从PID文件读取进程ID""" try: with open(PID_FILE, "r") as f: return int(f.read().strip()) except FileNotFoundError: logger.warning(f"[WARN] PID file {PID_FILE} not found") return None except Exception as e: logger.error(f"[ERROR] Failed to read PID file: {e}") return None def is_process_running(pid: int): """检查进程是否在运行""" try: # 发送信号0,不实际发送信号,仅检查进程是否存在 os.kill(pid, 0) return True except OSError: return False def start_service(): """启动服务""" # 检查是否已经在运行 pid = read_pid() if pid and is_process_running(pid): logger.warning(f"[WARN] Service is already running with PID {pid}") return False config_path = "config.yaml" # 1. 读取配置 cameras_data = load_cameras_from_yaml(config_path) logger.info(f"[INFO] Loaded {len(cameras_data)} cameras from {config_path}") if not cameras_data: logger.error("[ERROR] No cameras found in config, exiting...") return False # 2. 转换为 base64 JSON cameras_base64 = cameras_to_base64_json(cameras_data) # 3. 启动子进程 try: process = start_rtsp_service(cameras_base64) # 等待一下确保进程启动 time.sleep(1) # 4. 保存PID save_pid(process.pid) logger.info(f"[INFO] Service started successfully with PID {process.pid}") logger.info("[INFO] Main process exiting, service will continue running in background") return True except Exception as e: logger.error(f"[ERROR] Failed to start service: {e}") return False def status_service(): """检查服务状态""" pid = read_pid() if not pid: logger.info("[INFO] Service is not running (no PID file)") return False if is_process_running(pid): logger.info(f"[INFO] Service is running with PID {pid}") return True else: logger.info(f"[INFO] Service is not running (PID {pid} not found), cleaning up PID file") try: os.remove(PID_FILE) except: pass return False def stop_service(force=False): """停止服务""" pid = read_pid() if not pid: logger.error("[ERROR] No PID file found, service may not be running") return False if not is_process_running(pid): logger.warning(f"[WARN] Process with PID {pid} is not running, cleaning up PID file") try: os.remove(PID_FILE) except: pass return True # 发送终止信号 try: if force: logger.info(f"[INFO] Force killing process {pid} with SIGKILL") os.kill(pid, signal.SIGKILL) else: logger.info(f"[INFO] Gracefully stopping process {pid} with SIGTERM") os.kill(pid, signal.SIGTERM) # 等待进程结束(最多10秒) for i in range(10): if not is_process_running(pid): break time.sleep(1) # 如果进程还在,强制杀死 if is_process_running(pid): logger.warning(f"[WARN] Process {pid} still running after SIGTERM, sending SIGKILL") os.kill(pid, signal.SIGKILL) time.sleep(1) # 清理PID文件 if os.path.exists(PID_FILE): os.remove(PID_FILE) logger.info(f"[INFO] Service stopped successfully (PID: {pid})") return True except Exception as e: logger.error(f"[ERROR] Failed to stop service: {e}") return False def restart_service(): """重启服务""" logger.info("[INFO] Restarting service...") # 先停止 if status_service(): stop_service() time.sleep(2) # 等待进程完全停止 # 再启动 return start_service() def main(): parser = argparse.ArgumentParser(description="RTSP Service Manager") parser.add_argument("command", nargs="?", choices=["start", "stop", "restart", "status"], default="start", help="Command to execute: start, stop, restart, status (default: start)") parser.add_argument("--force", action="store_true", help="Force stop (send SIGKILL immediately)") args = parser.parse_args() if args.command == "start": success = start_service() sys.exit(0 if success else 1) elif args.command == "stop": success = stop_service(force=args.force) sys.exit(0 if success else 1) elif args.command == "restart": success = restart_service() sys.exit(0 if success else 1) elif args.command == "status": success = status_service() sys.exit(0 if success else 0) # 状态检查总是返回0退出码 if __name__ == "__main__": main()