# main_start.py # 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py # 支持 start, stop, restart, status 命令,默认执行 start import json import yaml import base64 import subprocess import sys import os import signal import argparse import time import glob from typing import List, Dict, Optional from common.camera_config import CameraConfig from utils.logger import get_logger logger = get_logger(__name__) # PID 文件目录 PID_DIR = "pids" # HLS下载器PID文件前缀 HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_" def load_debug_mode(config_path: str = "config.yaml") -> bool: """从配置文件读取调试模式""" try: with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) return cfg.get("debug_mode", False) except Exception: return False def load_downloader_debug_mode(config_path: str = "config.yaml") -> bool: """从配置文件读取下载器调试模式""" try: with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) return cfg.get("downloader_debug_mode", False) except Exception: return False # 调试模式:从配置文件读取 DEBUG_MODE = load_debug_mode() DOWNLOADER_DEBUG_MODE = load_downloader_debug_mode() def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict]: """从 YAML 文件加载服务组配置""" with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) return cfg.get("service_groups", []) def load_hls_config(config_path: str = "config.yaml") -> dict: """从配置文件加载HLS下载器配置""" try: with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) return { "hls_root_path": cfg.get("hls_root_path", ""), "daily_rotate_hour": cfg.get("hls_downloader_daily_rotate_hour", 3), "retention_days": cfg.get("hls_downloader_retention_days", 3), "retry_interval": cfg.get("hls_downloader_retry_interval_seconds", 10) } except Exception as e: logger.error(f"[ERROR] Failed to load HLS config: {e}") return { "hls_root_path": "", "daily_rotate_hour": 3, "retention_days": 3, "retry_interval": 10 } def cameras_to_base64_json(cameras: List[dict]) -> str: """将摄像头配置转换为 base64 编码的 JSON 字符串""" json_str = json.dumps(cameras, ensure_ascii=False) return base64.b64encode(json_str.encode('utf-8')).decode('ascii') def get_pid_file_path(group_name: str) -> str: """获取服务组的 PID 文件路径""" return os.path.join(PID_DIR, f"{group_name}.pid") def get_hls_downloader_pid_file_path(index_code: str) -> str: """获取HLS下载器的 PID 文件路径""" return os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}{index_code}.pid") def ensure_pid_dir(): """确保 PID 目录存在""" if not os.path.exists(PID_DIR): os.makedirs(PID_DIR) def save_pid(pid: int, group_name: str): """保存进程ID到文件""" ensure_pid_dir() pid_file = get_pid_file_path(group_name) try: with open(pid_file, "w") as f: f.write(str(pid)) logger.info(f"[INFO] Saved PID {pid} to {pid_file}") except Exception as e: logger.error(f"[ERROR] Failed to save PID file: {e}") def save_hls_downloader_pid(pid: int, index_code: str): """保存HLS下载器进程ID到文件""" ensure_pid_dir() pid_file = get_hls_downloader_pid_file_path(index_code) try: with open(pid_file, "w") as f: f.write(str(pid)) logger.info(f"[INFO] Saved HLS downloader PID {pid} to {pid_file}") except Exception as e: logger.error(f"[ERROR] Failed to save HLS downloader PID file: {e}") def read_pid(group_name: str): """从PID文件读取进程ID""" pid_file = get_pid_file_path(group_name) try: with open(pid_file, "r") as f: return int(f.read().strip()) except FileNotFoundError: return None except Exception as e: logger.error(f"[ERROR] Failed to read PID file: {e}") return None def read_hls_downloader_pid(index_code: str): """从PID文件读取HLS下载器进程ID""" pid_file = get_hls_downloader_pid_file_path(index_code) try: with open(pid_file, "r") as f: return int(f.read().strip()) except FileNotFoundError: return None except Exception as e: logger.error(f"[ERROR] Failed to read HLS downloader PID file: {e}") return None def is_process_running(pid: int): """检查进程是否在运行""" try: os.kill(pid, 0) return True except OSError: return False def get_script_path(video_source_type: str) -> str: """根据视频源类型获取对应的服务脚本路径""" if video_source_type == "hls": return "hls_service_ws_kadian.py" else: # 默认 rtsp return "rtsp_service_ws_kadian.py" def start_service_group(group: dict, hls_config: dict = None): """启动服务子进程(后台运行)""" cameras = group.get("cameras", []) ws_host = group.get("ws_host", "0.0.0.0") ws_port = group.get("ws_port", 8765) algorithm = group.get("algorithm", "") group_name = group.get("name", "default") video_source_type = group.get("video_source_type", "rtsp") # 根据视频源类型选择脚本 script_path = get_script_path(video_source_type) cameras_base64 = cameras_to_base64_json(cameras) cmd = [ sys.executable, script_path, "--cameras", cameras_base64, "--ws-host", ws_host, "--ws-port", str(ws_port), "--algorithm", algorithm ] # 如果是HLS类型,添加hls-root-path参数 if video_source_type == "hls" and hls_config: cmd.extend(["--hls-root-path", hls_config.get("hls_root_path", "")]) logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}") # DEBUG_MODE=True 时前台运行,方便调试;False 时后台运行,适合部署 if DEBUG_MODE: process = subprocess.Popen(cmd) else: process = subprocess.Popen(cmd, start_new_session=True) return process, group_name def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess.Popen]: """启动HLS下载器进程""" index_code = camera.get("index", "") camera_name = camera.get("name", "") camera_id = camera.get("id", 0) if not index_code: logger.warning(f"[WARN] Camera has no index_code, skipping HLS downloader") return None # 检查是否已经在运行 pid = read_hls_downloader_pid(index_code) if pid and is_process_running(pid): logger.warning(f"[WARN] HLS downloader for '{index_code}' is already running with PID {pid}") return None cmd = [ sys.executable, "hls_downloader.py", "--index-code", index_code, "--camera-name", camera_name, "--camera-id", str(camera_id), "--hls-root-path", hls_config.get("hls_root_path", ""), "--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)), "--retention-days", str(hls_config.get("retention_days", 3)), "--retry-interval", str(hls_config.get("retry_interval", 10)) ] logger.info(f"[INFO] Starting HLS downloader for camera '{camera_name}' (index: {index_code})") if DOWNLOADER_DEBUG_MODE: process = subprocess.Popen(cmd) else: process = subprocess.Popen(cmd, start_new_session=True) return process def start_service(): """启动所有服务组""" config_path = "config.yaml" # 1. 读取配置 service_groups = load_service_groups_from_yaml(config_path) hls_config = load_hls_config(config_path) logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}") logger.info(f"[INFO] HLS config: root_path={hls_config.get('hls_root_path')}, rotate_hour={hls_config.get('daily_rotate_hour')}") if not service_groups: logger.error("[ERROR] No service groups found in config, exiting...") return False # 2. 启动每个服务组 started_count = 0 downloader_count = 0 processes = [] # 记录所有子进程,用于调试模式下等待 downloader_processes = [] # 记录下载器进程 for group in service_groups: group_name = group.get("name", "default") video_source_type = group.get("video_source_type", "rtsp") cameras = group.get("cameras", []) # 检查是否已经在运行 pid = read_pid(group_name) if pid and is_process_running(pid): logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}") else: try: process, name = start_service_group(group, hls_config) time.sleep(0.5) save_pid(process.pid, name) logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}") processes.append((process, name)) started_count += 1 except Exception as e: logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}") # 如果是HLS类型,为每个摄像头启动下载进程 if video_source_type == "hls" and hls_config.get("hls_root_path"): for camera in cameras: index_code = camera.get("index", "") if not index_code: continue try: downloader_proc = start_hls_downloader(camera, hls_config) if downloader_proc: time.sleep(0.3) save_hls_downloader_pid(downloader_proc.pid, index_code) logger.info(f"[INFO] HLS downloader for '{index_code}' started with PID {downloader_proc.pid}") downloader_processes.append((downloader_proc, index_code)) downloader_count += 1 except Exception as e: logger.error(f"[ERROR] Failed to start HLS downloader for '{index_code}': {e}") logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups") logger.info(f"[INFO] Started {downloader_count} HLS downloaders") # 根据各自的调试模式决定是否等待子进程 if DEBUG_MODE and processes: logger.info("[DEBUG] Waiting for service processes...") for process, name in processes: process.wait() if DOWNLOADER_DEBUG_MODE and downloader_processes: logger.info("[DEBUG] Waiting for downloader processes...") for process, name in downloader_processes: process.wait() return started_count > 0 def get_all_hls_downloader_pids() -> List[tuple]: """获取所有HLS下载器的PID文件和index_code""" result = [] pid_pattern = os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}*.pid") for pid_file in glob.glob(pid_pattern): # 从文件名提取index_code: hls_downloader_{index_code}.pid filename = os.path.basename(pid_file) index_code = filename[len(HLS_DOWNLOADER_PID_PREFIX):-4] # 去掉前缀和.pid后缀 result.append((index_code, pid_file)) return result def status_service(): """检查所有服务组状态""" config_path = "config.yaml" service_groups = load_service_groups_from_yaml(config_path) if not service_groups: logger.info("[INFO] No service groups configured") return False running_count = 0 for group in service_groups: group_name = group.get("name", "default") pid = read_pid(group_name) if pid and is_process_running(pid): logger.info(f"[INFO] Service group '{group_name}' is running with PID {pid}") running_count += 1 else: logger.info(f"[INFO] Service group '{group_name}' is not running") # 清理无效的PID文件 if pid: try: os.remove(get_pid_file_path(group_name)) except: pass logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running") # 检查HLS下载器状态 downloader_pids = get_all_hls_downloader_pids() downloader_running = 0 for index_code, pid_file in downloader_pids: pid = read_hls_downloader_pid(index_code) if pid and is_process_running(pid): logger.info(f"[INFO] HLS downloader '{index_code}' is running with PID {pid}") downloader_running += 1 else: logger.info(f"[INFO] HLS downloader '{index_code}' is not running") if pid: try: os.remove(pid_file) except: pass if downloader_pids: logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running") return running_count > 0 def stop_service(force=False): """停止所有服务组和HLS下载器""" config_path = "config.yaml" service_groups = load_service_groups_from_yaml(config_path) if not service_groups: logger.info("[INFO] No service groups configured") return True stopped_count = 0 for group in service_groups: group_name = group.get("name", "default") pid = read_pid(group_name) if not pid: logger.info(f"[INFO] Service group '{group_name}' has no PID file") continue if not is_process_running(pid): logger.warning(f"[WARN] Service group '{group_name}' PID {pid} not running, cleaning up") try: os.remove(get_pid_file_path(group_name)) except: pass stopped_count += 1 continue try: if force: logger.info(f"[INFO] Force killing service group '{group_name}' (PID {pid})") os.kill(pid, signal.SIGKILL) else: logger.info(f"[INFO] Stopping service group '{group_name}' (PID {pid})") os.kill(pid, signal.SIGTERM) # 等待进程结束 for i in range(10): if not is_process_running(pid): break time.sleep(1) if is_process_running(pid): logger.warning(f"[WARN] Force killing service group '{group_name}'") os.kill(pid, signal.SIGKILL) time.sleep(1) # 清理PID文件 try: os.remove(get_pid_file_path(group_name)) except: pass logger.info(f"[INFO] Service group '{group_name}' stopped") stopped_count += 1 except Exception as e: logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}") logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups") # 停止所有HLS下载器 downloader_pids = get_all_hls_downloader_pids() downloader_stopped = 0 for index_code, pid_file in downloader_pids: pid = read_hls_downloader_pid(index_code) if not pid: continue if not is_process_running(pid): logger.warning(f"[WARN] HLS downloader '{index_code}' PID {pid} not running, cleaning up") try: os.remove(pid_file) except: pass downloader_stopped += 1 continue try: if force: logger.info(f"[INFO] Force killing HLS downloader '{index_code}' (PID {pid})") os.kill(pid, signal.SIGKILL) else: logger.info(f"[INFO] Stopping HLS downloader '{index_code}' (PID {pid})") os.kill(pid, signal.SIGTERM) # 等待进程结束 for i in range(10): if not is_process_running(pid): break time.sleep(1) if is_process_running(pid): logger.warning(f"[WARN] Force killing HLS downloader '{index_code}'") os.kill(pid, signal.SIGKILL) time.sleep(1) # 清理PID文件 try: os.remove(pid_file) except: pass logger.info(f"[INFO] HLS downloader '{index_code}' stopped") downloader_stopped += 1 except Exception as e: logger.error(f"[ERROR] Failed to stop HLS downloader '{index_code}': {e}") if downloader_pids: logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders") return True def restart_service(): """重启所有服务组""" logger.info("[INFO] Restarting all service groups...") stop_service() time.sleep(2) return start_service() def main(): parser = argparse.ArgumentParser(description="RTSP Service Manager") parser.add_argument("command", nargs="?", choices=["start", "stop", "restart", "status"], default="start", help="Command to execute: start, stop, restart, status (default: start)") parser.add_argument("--force", action="store_true", help="Force stop (send SIGKILL immediately)") args = parser.parse_args() if args.command == "start": success = start_service() sys.exit(0 if success else 1) elif args.command == "stop": success = stop_service(force=args.force) sys.exit(0 if success else 1) elif args.command == "restart": success = restart_service() sys.exit(0 if success else 1) elif args.command == "status": status_service() sys.exit(0) if __name__ == "__main__": main()