Files
SupervisorAI/main_start.py

682 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# main_start.py
# 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py
# 支持 start, stop, restart, status 命令,默认执行 start
import json
import yaml
import base64
import subprocess
import sys
import os
import signal
import argparse
import time
import glob
from typing import List, Dict, Optional
from common.camera_config import CameraConfig
from utils.logger import get_logger
logger = get_logger(__name__)
# PID 文件目录
PID_DIR = "pids"
# HLS下载器PID文件前缀
HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_"
# HLS清理进程PID文件
HLS_CLEANUP_PID_FILE = "hls_cleanup.pid"
def load_debug_mode(config_path: str = "config.yaml") -> bool:
"""从配置文件读取调试模式"""
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg.get("debug_mode", False)
except Exception:
return False
def load_downloader_debug_mode(config_path: str = "config.yaml") -> bool:
"""从配置文件读取下载器调试模式"""
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg.get("downloader_debug_mode", False)
except Exception:
return False
# 调试模式:从配置文件读取
DEBUG_MODE = load_debug_mode()
DOWNLOADER_DEBUG_MODE = load_downloader_debug_mode()
# 设备分配计数器
device_counter = 0
def get_next_device(total_devices: int) -> int:
"""获取下一个分配的设备ID"""
global device_counter
device_id = device_counter % total_devices
device_counter += 1
return device_id
def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict]:
"""从 YAML 文件加载服务组配置"""
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg.get("service_groups", [])
def load_hls_config(config_path: str = "config.yaml") -> dict:
"""从配置文件加载HLS下载器配置"""
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return {
"hls_root_path": cfg.get("hls_root_path", ""),
"daily_rotate_hour": cfg.get("hls_downloader_daily_rotate_hour", 3),
"retention_days": cfg.get("hls_downloader_retention_days", 3),
"retry_interval": cfg.get("hls_downloader_retry_interval_seconds", 10),
"total_devices": cfg.get("total_devices", 1)
}
except Exception as e:
logger.error(f"[ERROR] Failed to load HLS config: {e}")
return {
"hls_root_path": "",
"daily_rotate_hour": 3,
"retention_days": 3,
"retry_interval": 10,
"total_devices": 1
}
def cameras_to_base64_json(cameras: List[dict]) -> str:
"""将摄像头配置转换为 base64 编码的 JSON 字符串"""
json_str = json.dumps(cameras, ensure_ascii=False)
return base64.b64encode(json_str.encode('utf-8')).decode('ascii')
def get_pid_file_path(group_name: str) -> str:
"""获取服务组的 PID 文件路径"""
return os.path.join(PID_DIR, f"{group_name}.pid")
def get_hls_downloader_pid_file_path(index_code: str) -> str:
"""获取HLS下载器的 PID 文件路径"""
return os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}{index_code}.pid")
def ensure_pid_dir():
"""确保 PID 目录存在"""
if not os.path.exists(PID_DIR):
os.makedirs(PID_DIR)
def save_pid(pid: int, group_name: str):
"""保存进程ID到文件"""
ensure_pid_dir()
pid_file = get_pid_file_path(group_name)
try:
with open(pid_file, "w") as f:
f.write(str(pid))
logger.info(f"[INFO] Saved PID {pid} to {pid_file}")
except Exception as e:
logger.error(f"[ERROR] Failed to save PID file: {e}")
def save_hls_downloader_pid(pid: int, index_code: str):
"""保存HLS下载器进程ID到文件"""
ensure_pid_dir()
pid_file = get_hls_downloader_pid_file_path(index_code)
try:
with open(pid_file, "w") as f:
f.write(str(pid))
logger.info(f"[INFO] Saved HLS downloader PID {pid} to {pid_file}")
except Exception as e:
logger.error(f"[ERROR] Failed to save HLS downloader PID file: {e}")
def read_pid(group_name: str):
"""从PID文件读取进程ID"""
pid_file = get_pid_file_path(group_name)
try:
with open(pid_file, "r") as f:
return int(f.read().strip())
except FileNotFoundError:
return None
except Exception as e:
logger.error(f"[ERROR] Failed to read PID file: {e}")
return None
def read_hls_downloader_pid(index_code: str):
"""从PID文件读取HLS下载器进程ID"""
pid_file = get_hls_downloader_pid_file_path(index_code)
try:
with open(pid_file, "r") as f:
return int(f.read().strip())
except FileNotFoundError:
return None
except Exception as e:
logger.error(f"[ERROR] Failed to read HLS downloader PID file: {e}")
return None
def get_hls_cleanup_pid_file_path() -> str:
"""获取HLS清理进程的 PID 文件路径"""
return os.path.join(PID_DIR, HLS_CLEANUP_PID_FILE)
def save_hls_cleanup_pid(pid: int):
"""保存HLS清理进程ID到文件"""
ensure_pid_dir()
pid_file = get_hls_cleanup_pid_file_path()
try:
with open(pid_file, "w") as f:
f.write(str(pid))
logger.info(f"[INFO] Saved HLS cleanup PID {pid} to {pid_file}")
except Exception as e:
logger.error(f"[ERROR] Failed to save HLS cleanup PID file: {e}")
def read_hls_cleanup_pid():
"""从PID文件读取HLS清理进程ID"""
pid_file = get_hls_cleanup_pid_file_path()
try:
with open(pid_file, "r") as f:
return int(f.read().strip())
except FileNotFoundError:
return None
except Exception as e:
logger.error(f"[ERROR] Failed to read HLS cleanup PID file: {e}")
return None
def is_process_running(pid: int):
"""检查进程是否在运行"""
try:
os.kill(pid, 0)
return True
except OSError:
return False
def get_script_path(video_source_type: str) -> str:
"""根据视频源类型获取对应的服务脚本路径"""
if video_source_type == "hls":
return "hls_service_ws_kadian.py"
else: # 默认 rtsp
return "rtsp_service_ws_kadian.py"
def start_service_group(group: dict, hls_config: dict = None, device_id: int = None):
"""启动服务子进程(后台运行)"""
cameras = group.get("cameras", [])
ws_host = group.get("ws_host", "0.0.0.0")
ws_port = group.get("ws_port", 8765)
algorithm = group.get("algorithm", "")
group_name = group.get("name", "default")
video_source_type = group.get("video_source_type", "rtsp")
# 根据视频源类型选择脚本
script_path = get_script_path(video_source_type)
cameras_base64 = cameras_to_base64_json(cameras)
cmd = [
sys.executable,
script_path,
"--cameras", cameras_base64,
"--ws-host", ws_host,
"--ws-port", str(ws_port),
"--algorithm", algorithm
]
# 如果是HLS类型添加hls-root-path参数
if video_source_type == "hls" and hls_config:
cmd.extend(["--hls-root-path", hls_config.get("hls_root_path", "")])
# 设置环境变量(包括设备分配)
env = os.environ.copy()
if device_id is not None:
env["ASCEND_RT_VISIBLE_DEVICES"] = str(device_id)
logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}, device={device_id}")
else:
logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}")
# DEBUG_MODE=True 时前台运行方便调试False 时后台运行,适合部署
if DEBUG_MODE:
process = subprocess.Popen(cmd)
else:
process = subprocess.Popen(cmd, env=env, start_new_session=True)
return process, group_name
def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess.Popen]:
"""启动HLS下载器进程"""
index_code = camera.get("index", "")
camera_name = camera.get("name", "")
camera_id = camera.get("id", 0)
if not index_code:
logger.warning(f"[WARN] Camera has no index_code, skipping HLS downloader")
return None
# 检查是否已经在运行
pid = read_hls_downloader_pid(index_code)
if pid and is_process_running(pid):
logger.warning(f"[WARN] HLS downloader for '{index_code}' is already running with PID {pid}")
return None
cmd = [
sys.executable,
"hls_downloader.py",
"--index-code", index_code,
"--camera-name", camera_name,
"--camera-id", str(camera_id),
"--hls-root-path", hls_config.get("hls_root_path", ""),
"--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)),
"--retry-interval", str(hls_config.get("retry_interval", 10))
]
logger.info(f"[INFO] Starting HLS downloader for camera '{camera_name}' (index: {index_code})")
if DOWNLOADER_DEBUG_MODE:
process = subprocess.Popen(cmd)
else:
process = subprocess.Popen(cmd, start_new_session=True)
return process
def start_hls_cleanup(hls_config: dict) -> Optional[subprocess.Popen]:
"""启动HLS清理进程"""
hls_root_path = hls_config.get("hls_root_path", "")
retention_days = hls_config.get("retention_days", 3)
if not hls_root_path:
logger.warning("[WARN] HLS root path not configured, skipping HLS cleanup")
return None
# 检查是否已经在运行
pid = read_hls_cleanup_pid()
if pid and is_process_running(pid):
logger.warning(f"[WARN] HLS cleanup is already running with PID {pid}")
return None
cmd = [
sys.executable,
"hls_cleanup.py",
"--hls-root-path", hls_root_path,
"--retention-days", str(retention_days),
"--interval", "1" # 每小时清理一次
]
logger.info(f"[INFO] Starting HLS cleanup service (retention: {retention_days} days)")
if DOWNLOADER_DEBUG_MODE:
process = subprocess.Popen(cmd)
else:
process = subprocess.Popen(cmd, start_new_session=True)
return process
def start_service():
"""启动所有服务组"""
config_path = "config.yaml"
# 1. 读取配置
service_groups = load_service_groups_from_yaml(config_path)
hls_config = load_hls_config(config_path)
total_devices = hls_config.get("total_devices", 1)
logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}")
logger.info(f"[INFO] HLS config: root_path={hls_config.get('hls_root_path')}, rotate_hour={hls_config.get('daily_rotate_hour')}, total_devices={total_devices}")
if not service_groups:
logger.error("[ERROR] No service groups found in config, exiting...")
return False
# 2. 启动每个服务组
started_count = 0
downloader_count = 0
processes = [] # 记录所有子进程,用于调试模式下等待
downloader_processes = [] # 记录下载器进程
for group in service_groups:
group_name = group.get("name", "default")
video_source_type = group.get("video_source_type", "rtsp")
cameras = group.get("cameras", [])
# 检查是否已经在运行
pid = read_pid(group_name)
if pid and is_process_running(pid):
logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}")
else:
try:
# 获取下一个设备ID
device_id = get_next_device(total_devices) if total_devices > 1 else None
process, name = start_service_group(group, hls_config, device_id=device_id)
time.sleep(0.5)
save_pid(process.pid, name)
logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}")
processes.append((process, name))
started_count += 1
except Exception as e:
logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}")
# 如果是HLS类型为每个摄像头启动下载进程
if video_source_type == "hls" and hls_config.get("hls_root_path"):
for camera in cameras:
index_code = camera.get("index", "")
if not index_code:
continue
try:
downloader_proc = start_hls_downloader(camera, hls_config)
if downloader_proc:
time.sleep(0.3)
save_hls_downloader_pid(downloader_proc.pid, index_code)
logger.info(f"[INFO] HLS downloader for '{index_code}' started with PID {downloader_proc.pid}")
downloader_processes.append((downloader_proc, index_code))
downloader_count += 1
except Exception as e:
logger.error(f"[ERROR] Failed to start HLS downloader for '{index_code}': {e}")
logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups")
logger.info(f"[INFO] Started {downloader_count} HLS downloaders")
# 启动HLS清理进程
cleanup_process = None
if hls_config.get("hls_root_path"):
try:
cleanup_process = start_hls_cleanup(hls_config)
if cleanup_process:
time.sleep(0.3)
save_hls_cleanup_pid(cleanup_process.pid)
logger.info(f"[INFO] HLS cleanup service started with PID {cleanup_process.pid}")
except Exception as e:
logger.error(f"[ERROR] Failed to start HLS cleanup service: {e}")
# 根据各自的调试模式决定是否等待子进程
if DEBUG_MODE and processes:
logger.info("[DEBUG] Waiting for service processes...")
for process, name in processes:
process.wait()
if DOWNLOADER_DEBUG_MODE and downloader_processes:
logger.info("[DEBUG] Waiting for downloader processes...")
for process, name in downloader_processes:
process.wait()
if DOWNLOADER_DEBUG_MODE and cleanup_process:
logger.info("[DEBUG] Waiting for cleanup process...")
cleanup_process.wait()
return started_count > 0
def get_all_hls_downloader_pids() -> List[tuple]:
"""获取所有HLS下载器的PID文件和index_code"""
result = []
pid_pattern = os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}*.pid")
for pid_file in glob.glob(pid_pattern):
# 从文件名提取index_code: hls_downloader_{index_code}.pid
filename = os.path.basename(pid_file)
index_code = filename[len(HLS_DOWNLOADER_PID_PREFIX):-4] # 去掉前缀和.pid后缀
result.append((index_code, pid_file))
return result
def status_service():
"""检查所有服务组状态"""
config_path = "config.yaml"
service_groups = load_service_groups_from_yaml(config_path)
if not service_groups:
logger.info("[INFO] No service groups configured")
return False
running_count = 0
for group in service_groups:
group_name = group.get("name", "default")
pid = read_pid(group_name)
if pid and is_process_running(pid):
logger.info(f"[INFO] Service group '{group_name}' is running with PID {pid}")
running_count += 1
else:
logger.info(f"[INFO] Service group '{group_name}' is not running")
# 清理无效的PID文件
if pid:
try:
os.remove(get_pid_file_path(group_name))
except:
pass
logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running")
# 检查HLS下载器状态
downloader_pids = get_all_hls_downloader_pids()
downloader_running = 0
for index_code, pid_file in downloader_pids:
pid = read_hls_downloader_pid(index_code)
if pid and is_process_running(pid):
logger.info(f"[INFO] HLS downloader '{index_code}' is running with PID {pid}")
downloader_running += 1
else:
logger.info(f"[INFO] HLS downloader '{index_code}' is not running")
if pid:
try:
os.remove(pid_file)
except:
pass
if downloader_pids:
logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running")
# 检查HLS清理进程状态
cleanup_pid = read_hls_cleanup_pid()
if cleanup_pid and is_process_running(cleanup_pid):
logger.info(f"[INFO] HLS cleanup service is running with PID {cleanup_pid}")
else:
logger.info(f"[INFO] HLS cleanup service is not running")
if cleanup_pid:
try:
os.remove(get_hls_cleanup_pid_file_path())
except:
pass
return running_count > 0
def stop_service(force=False):
"""停止所有服务组和HLS下载器"""
config_path = "config.yaml"
service_groups = load_service_groups_from_yaml(config_path)
if not service_groups:
logger.info("[INFO] No service groups configured")
return True
stopped_count = 0
for group in service_groups:
group_name = group.get("name", "default")
pid = read_pid(group_name)
if not pid:
logger.info(f"[INFO] Service group '{group_name}' has no PID file")
continue
if not is_process_running(pid):
logger.warning(f"[WARN] Service group '{group_name}' PID {pid} not running, cleaning up")
try:
os.remove(get_pid_file_path(group_name))
except:
pass
stopped_count += 1
continue
try:
if force:
logger.info(f"[INFO] Force killing service group '{group_name}' (PID {pid})")
os.kill(pid, signal.SIGKILL)
else:
logger.info(f"[INFO] Stopping service group '{group_name}' (PID {pid})")
os.kill(pid, signal.SIGTERM)
# 等待进程结束
for i in range(10):
if not is_process_running(pid):
break
time.sleep(1)
if is_process_running(pid):
logger.warning(f"[WARN] Force killing service group '{group_name}'")
os.kill(pid, signal.SIGKILL)
time.sleep(1)
# 清理PID文件
try:
os.remove(get_pid_file_path(group_name))
except:
pass
logger.info(f"[INFO] Service group '{group_name}' stopped")
stopped_count += 1
except Exception as e:
logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}")
logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups")
# 停止所有HLS下载器
downloader_pids = get_all_hls_downloader_pids()
downloader_stopped = 0
for index_code, pid_file in downloader_pids:
pid = read_hls_downloader_pid(index_code)
if not pid:
continue
if not is_process_running(pid):
logger.warning(f"[WARN] HLS downloader '{index_code}' PID {pid} not running, cleaning up")
try:
os.remove(pid_file)
except:
pass
downloader_stopped += 1
continue
try:
if force:
logger.info(f"[INFO] Force killing HLS downloader '{index_code}' (PID {pid})")
os.kill(pid, signal.SIGKILL)
else:
logger.info(f"[INFO] Stopping HLS downloader '{index_code}' (PID {pid})")
os.kill(pid, signal.SIGTERM)
# 等待进程结束
for i in range(10):
if not is_process_running(pid):
break
time.sleep(1)
if is_process_running(pid):
logger.warning(f"[WARN] Force killing HLS downloader '{index_code}'")
os.kill(pid, signal.SIGKILL)
time.sleep(1)
# 清理PID文件
try:
os.remove(pid_file)
except:
pass
logger.info(f"[INFO] HLS downloader '{index_code}' stopped")
downloader_stopped += 1
except Exception as e:
logger.error(f"[ERROR] Failed to stop HLS downloader '{index_code}': {e}")
if downloader_pids:
logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders")
# 停止HLS清理进程
cleanup_pid = read_hls_cleanup_pid()
if cleanup_pid:
if not is_process_running(cleanup_pid):
logger.warning(f"[WARN] HLS cleanup PID {cleanup_pid} not running, cleaning up")
try:
os.remove(get_hls_cleanup_pid_file_path())
except:
pass
else:
try:
if force:
logger.info(f"[INFO] Force killing HLS cleanup (PID {cleanup_pid})")
os.kill(cleanup_pid, signal.SIGKILL)
else:
logger.info(f"[INFO] Stopping HLS cleanup (PID {cleanup_pid})")
os.kill(cleanup_pid, signal.SIGTERM)
# 等待进程结束
for i in range(10):
if not is_process_running(cleanup_pid):
break
time.sleep(1)
if is_process_running(cleanup_pid):
logger.warning("[WARN] Force killing HLS cleanup")
os.kill(cleanup_pid, signal.SIGKILL)
time.sleep(1)
# 清理PID文件
try:
os.remove(get_hls_cleanup_pid_file_path())
except:
pass
logger.info("[INFO] HLS cleanup stopped")
except Exception as e:
logger.error(f"[ERROR] Failed to stop HLS cleanup: {e}")
return True
def restart_service():
"""重启所有服务组"""
logger.info("[INFO] Restarting all service groups...")
stop_service()
time.sleep(2)
return start_service()
def main():
parser = argparse.ArgumentParser(description="RTSP Service Manager")
parser.add_argument("command", nargs="?", choices=["start", "stop", "restart", "status"], default="start",
help="Command to execute: start, stop, restart, status (default: start)")
parser.add_argument("--force", action="store_true",
help="Force stop (send SIGKILL immediately)")
args = parser.parse_args()
if args.command == "start":
success = start_service()
sys.exit(0 if success else 1)
elif args.command == "stop":
success = stop_service(force=args.force)
sys.exit(0 if success else 1)
elif args.command == "restart":
success = restart_service()
sys.exit(0 if success else 1)
elif args.command == "status":
status_service()
sys.exit(0)
if __name__ == "__main__":
main()