Files
SupervisorAI/main_start.py
2026-02-27 12:05:44 +08:00

307 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# main_start.py
# 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py
# 支持 start, stop, restart, status 命令,默认执行 start
import json
import yaml
import base64
import subprocess
import sys
import os
import signal
import argparse
import time
from typing import List, Dict
from common.camera_config import CameraConfig
from utils.logger import get_logger
logger = get_logger(__name__)
# PID 文件目录
PID_DIR = "pids"
def load_debug_mode(config_path: str = "config.yaml") -> bool:
"""从配置文件读取调试模式"""
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg.get("debug_mode", False)
except Exception:
return False
# 调试模式:从配置文件读取
DEBUG_MODE = load_debug_mode()
def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict]:
"""从 YAML 文件加载服务组配置"""
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg.get("service_groups", [])
def cameras_to_base64_json(cameras: List[dict]) -> str:
"""将摄像头配置转换为 base64 编码的 JSON 字符串"""
json_str = json.dumps(cameras, ensure_ascii=False)
return base64.b64encode(json_str.encode('utf-8')).decode('ascii')
def get_pid_file_path(group_name: str) -> str:
"""获取服务组的 PID 文件路径"""
return os.path.join(PID_DIR, f"{group_name}.pid")
def ensure_pid_dir():
"""确保 PID 目录存在"""
if not os.path.exists(PID_DIR):
os.makedirs(PID_DIR)
def save_pid(pid: int, group_name: str):
"""保存进程ID到文件"""
ensure_pid_dir()
pid_file = get_pid_file_path(group_name)
try:
with open(pid_file, "w") as f:
f.write(str(pid))
logger.info(f"[INFO] Saved PID {pid} to {pid_file}")
except Exception as e:
logger.error(f"[ERROR] Failed to save PID file: {e}")
def read_pid(group_name: str):
"""从PID文件读取进程ID"""
pid_file = get_pid_file_path(group_name)
try:
with open(pid_file, "r") as f:
return int(f.read().strip())
except FileNotFoundError:
return None
except Exception as e:
logger.error(f"[ERROR] Failed to read PID file: {e}")
return None
def is_process_running(pid: int):
"""检查进程是否在运行"""
try:
os.kill(pid, 0)
return True
except OSError:
return False
def get_script_path(video_source_type: str) -> str:
"""根据视频源类型获取对应的服务脚本路径"""
if video_source_type == "hls":
return "hls_service_ws_kadian.py"
else: # 默认 rtsp
return "rtsp_service_ws_kadian.py"
def start_service_group(group: dict):
"""启动服务子进程(后台运行)"""
cameras = group.get("cameras", [])
ws_host = group.get("ws_host", "0.0.0.0")
ws_port = group.get("ws_port", 8765)
algorithm = group.get("algorithm", "")
group_name = group.get("name", "default")
video_source_type = group.get("video_source_type", "rtsp")
# 根据视频源类型选择脚本
script_path = get_script_path(video_source_type)
cameras_base64 = cameras_to_base64_json(cameras)
cmd = [
sys.executable,
script_path,
"--cameras", cameras_base64,
"--ws-host", ws_host,
"--ws-port", str(ws_port),
"--algorithm", algorithm
]
logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}")
# DEBUG_MODE=True 时前台运行方便调试False 时后台运行,适合部署
if DEBUG_MODE:
process = subprocess.Popen(cmd)
else:
process = subprocess.Popen(cmd, start_new_session=True)
return process, group_name
def start_service():
"""启动所有服务组"""
config_path = "config.yaml"
# 1. 读取配置
service_groups = load_service_groups_from_yaml(config_path)
logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}")
if not service_groups:
logger.error("[ERROR] No service groups found in config, exiting...")
return False
# 2. 启动每个服务组
started_count = 0
processes = [] # 记录所有子进程,用于调试模式下等待
for group in service_groups:
group_name = group.get("name", "default")
# 检查是否已经在运行
pid = read_pid(group_name)
if pid and is_process_running(pid):
logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}")
continue
try:
process, name = start_service_group(group)
time.sleep(0.5)
save_pid(process.pid, name)
logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}")
processes.append((process, name))
started_count += 1
except Exception as e:
logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}")
logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups")
# DEBUG_MODE=True 时,主进程等待所有子进程
if DEBUG_MODE and processes:
logger.info("[DEBUG] Running in foreground mode, waiting for child processes...")
for process, name in processes:
process.wait()
return started_count > 0
def status_service():
"""检查所有服务组状态"""
config_path = "config.yaml"
service_groups = load_service_groups_from_yaml(config_path)
if not service_groups:
logger.info("[INFO] No service groups configured")
return False
running_count = 0
for group in service_groups:
group_name = group.get("name", "default")
pid = read_pid(group_name)
if pid and is_process_running(pid):
logger.info(f"[INFO] Service group '{group_name}' is running with PID {pid}")
running_count += 1
else:
logger.info(f"[INFO] Service group '{group_name}' is not running")
# 清理无效的PID文件
if pid:
try:
os.remove(get_pid_file_path(group_name))
except:
pass
logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running")
return running_count > 0
def stop_service(force=False):
"""停止所有服务组"""
config_path = "config.yaml"
service_groups = load_service_groups_from_yaml(config_path)
if not service_groups:
logger.info("[INFO] No service groups configured")
return True
stopped_count = 0
for group in service_groups:
group_name = group.get("name", "default")
pid = read_pid(group_name)
if not pid:
logger.info(f"[INFO] Service group '{group_name}' has no PID file")
continue
if not is_process_running(pid):
logger.warning(f"[WARN] Service group '{group_name}' PID {pid} not running, cleaning up")
try:
os.remove(get_pid_file_path(group_name))
except:
pass
stopped_count += 1
continue
try:
if force:
logger.info(f"[INFO] Force killing service group '{group_name}' (PID {pid})")
os.kill(pid, signal.SIGKILL)
else:
logger.info(f"[INFO] Stopping service group '{group_name}' (PID {pid})")
os.kill(pid, signal.SIGTERM)
# 等待进程结束
for i in range(10):
if not is_process_running(pid):
break
time.sleep(1)
if is_process_running(pid):
logger.warning(f"[WARN] Force killing service group '{group_name}'")
os.kill(pid, signal.SIGKILL)
time.sleep(1)
# 清理PID文件
try:
os.remove(get_pid_file_path(group_name))
except:
pass
logger.info(f"[INFO] Service group '{group_name}' stopped")
stopped_count += 1
except Exception as e:
logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}")
logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups")
return True
def restart_service():
"""重启所有服务组"""
logger.info("[INFO] Restarting all service groups...")
stop_service()
time.sleep(2)
return start_service()
def main():
parser = argparse.ArgumentParser(description="RTSP Service Manager")
parser.add_argument("command", nargs="?", choices=["start", "stop", "restart", "status"], default="start",
help="Command to execute: start, stop, restart, status (default: start)")
parser.add_argument("--force", action="store_true",
help="Force stop (send SIGKILL immediately)")
args = parser.parse_args()
if args.command == "start":
success = start_service()
sys.exit(0 if success else 1)
elif args.command == "stop":
success = stop_service(force=args.force)
sys.exit(0 if success else 1)
elif args.command == "restart":
success = restart_service()
sys.exit(0 if success else 1)
elif args.command == "status":
status_service()
sys.exit(0)
if __name__ == "__main__":
main()