550 lines
19 KiB
Python
550 lines
19 KiB
Python
# main_start.py
|
||
# 主启动脚本:读取配置并通过 subprocess 启动 rtsp_service_ws_kadian.py
|
||
# 支持 start, stop, restart, status 命令,默认执行 start
|
||
|
||
import json
|
||
import yaml
|
||
import base64
|
||
import subprocess
|
||
import sys
|
||
import os
|
||
import signal
|
||
import argparse
|
||
import time
|
||
import glob
|
||
from typing import List, Dict, Optional
|
||
|
||
from common.camera_config import CameraConfig
|
||
from utils.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# PID 文件目录
|
||
PID_DIR = "pids"
|
||
|
||
# HLS下载器PID文件前缀
|
||
HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_"
|
||
|
||
|
||
def load_debug_mode(config_path: str = "config.yaml") -> bool:
|
||
"""从配置文件读取调试模式"""
|
||
try:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
return cfg.get("debug_mode", False)
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def load_downloader_debug_mode(config_path: str = "config.yaml") -> bool:
|
||
"""从配置文件读取下载器调试模式"""
|
||
try:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
return cfg.get("downloader_debug_mode", False)
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
# 调试模式:从配置文件读取
|
||
DEBUG_MODE = load_debug_mode()
|
||
DOWNLOADER_DEBUG_MODE = load_downloader_debug_mode()
|
||
|
||
# 设备分配计数器
|
||
device_counter = 0
|
||
|
||
|
||
def get_next_device(total_devices: int) -> int:
|
||
"""获取下一个分配的设备ID"""
|
||
global device_counter
|
||
device_id = device_counter % total_devices
|
||
device_counter += 1
|
||
return device_id
|
||
|
||
|
||
def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict]:
|
||
"""从 YAML 文件加载服务组配置"""
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
return cfg.get("service_groups", [])
|
||
|
||
|
||
def load_hls_config(config_path: str = "config.yaml") -> dict:
|
||
"""从配置文件加载HLS下载器配置"""
|
||
try:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
return {
|
||
"hls_root_path": cfg.get("hls_root_path", ""),
|
||
"daily_rotate_hour": cfg.get("hls_downloader_daily_rotate_hour", 3),
|
||
"retention_days": cfg.get("hls_downloader_retention_days", 3),
|
||
"retry_interval": cfg.get("hls_downloader_retry_interval_seconds", 10),
|
||
"total_devices": cfg.get("total_devices", 1)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to load HLS config: {e}")
|
||
return {
|
||
"hls_root_path": "",
|
||
"daily_rotate_hour": 3,
|
||
"retention_days": 3,
|
||
"retry_interval": 10,
|
||
"total_devices": 1
|
||
}
|
||
|
||
|
||
def cameras_to_base64_json(cameras: List[dict]) -> str:
|
||
"""将摄像头配置转换为 base64 编码的 JSON 字符串"""
|
||
json_str = json.dumps(cameras, ensure_ascii=False)
|
||
return base64.b64encode(json_str.encode('utf-8')).decode('ascii')
|
||
|
||
|
||
def get_pid_file_path(group_name: str) -> str:
|
||
"""获取服务组的 PID 文件路径"""
|
||
return os.path.join(PID_DIR, f"{group_name}.pid")
|
||
|
||
|
||
def get_hls_downloader_pid_file_path(index_code: str) -> str:
|
||
"""获取HLS下载器的 PID 文件路径"""
|
||
return os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}{index_code}.pid")
|
||
|
||
|
||
def ensure_pid_dir():
|
||
"""确保 PID 目录存在"""
|
||
if not os.path.exists(PID_DIR):
|
||
os.makedirs(PID_DIR)
|
||
|
||
|
||
def save_pid(pid: int, group_name: str):
|
||
"""保存进程ID到文件"""
|
||
ensure_pid_dir()
|
||
pid_file = get_pid_file_path(group_name)
|
||
try:
|
||
with open(pid_file, "w") as f:
|
||
f.write(str(pid))
|
||
logger.info(f"[INFO] Saved PID {pid} to {pid_file}")
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to save PID file: {e}")
|
||
|
||
|
||
def save_hls_downloader_pid(pid: int, index_code: str):
|
||
"""保存HLS下载器进程ID到文件"""
|
||
ensure_pid_dir()
|
||
pid_file = get_hls_downloader_pid_file_path(index_code)
|
||
try:
|
||
with open(pid_file, "w") as f:
|
||
f.write(str(pid))
|
||
logger.info(f"[INFO] Saved HLS downloader PID {pid} to {pid_file}")
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to save HLS downloader PID file: {e}")
|
||
|
||
|
||
def read_pid(group_name: str):
|
||
"""从PID文件读取进程ID"""
|
||
pid_file = get_pid_file_path(group_name)
|
||
try:
|
||
with open(pid_file, "r") as f:
|
||
return int(f.read().strip())
|
||
except FileNotFoundError:
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to read PID file: {e}")
|
||
return None
|
||
|
||
|
||
def read_hls_downloader_pid(index_code: str):
|
||
"""从PID文件读取HLS下载器进程ID"""
|
||
pid_file = get_hls_downloader_pid_file_path(index_code)
|
||
try:
|
||
with open(pid_file, "r") as f:
|
||
return int(f.read().strip())
|
||
except FileNotFoundError:
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to read HLS downloader PID file: {e}")
|
||
return None
|
||
|
||
|
||
def is_process_running(pid: int):
|
||
"""检查进程是否在运行"""
|
||
try:
|
||
os.kill(pid, 0)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
def get_script_path(video_source_type: str) -> str:
|
||
"""根据视频源类型获取对应的服务脚本路径"""
|
||
if video_source_type == "hls":
|
||
return "hls_service_ws_kadian.py"
|
||
else: # 默认 rtsp
|
||
return "rtsp_service_ws_kadian.py"
|
||
|
||
|
||
def start_service_group(group: dict, hls_config: dict = None, device_id: int = None):
|
||
"""启动服务子进程(后台运行)"""
|
||
cameras = group.get("cameras", [])
|
||
ws_host = group.get("ws_host", "0.0.0.0")
|
||
ws_port = group.get("ws_port", 8765)
|
||
algorithm = group.get("algorithm", "")
|
||
group_name = group.get("name", "default")
|
||
video_source_type = group.get("video_source_type", "rtsp")
|
||
|
||
# 根据视频源类型选择脚本
|
||
script_path = get_script_path(video_source_type)
|
||
cameras_base64 = cameras_to_base64_json(cameras)
|
||
|
||
cmd = [
|
||
sys.executable,
|
||
script_path,
|
||
"--cameras", cameras_base64,
|
||
"--ws-host", ws_host,
|
||
"--ws-port", str(ws_port),
|
||
"--algorithm", algorithm
|
||
]
|
||
|
||
# 如果是HLS类型,添加hls-root-path参数
|
||
if video_source_type == "hls" and hls_config:
|
||
cmd.extend(["--hls-root-path", hls_config.get("hls_root_path", "")])
|
||
|
||
# 设置环境变量(包括设备分配)
|
||
env = os.environ.copy()
|
||
if device_id is not None:
|
||
env["ASCEND_RT_VISIBLE_DEVICES"] = str(device_id)
|
||
logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}, device={device_id}")
|
||
else:
|
||
logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}")
|
||
|
||
# DEBUG_MODE=True 时前台运行,方便调试;False 时后台运行,适合部署
|
||
if DEBUG_MODE:
|
||
process = subprocess.Popen(cmd, env=env)
|
||
else:
|
||
process = subprocess.Popen(cmd, env=env, start_new_session=True)
|
||
|
||
return process, group_name
|
||
|
||
|
||
def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess.Popen]:
|
||
"""启动HLS下载器进程"""
|
||
index_code = camera.get("index", "")
|
||
camera_name = camera.get("name", "")
|
||
camera_id = camera.get("id", 0)
|
||
|
||
if not index_code:
|
||
logger.warning(f"[WARN] Camera has no index_code, skipping HLS downloader")
|
||
return None
|
||
|
||
# 检查是否已经在运行
|
||
pid = read_hls_downloader_pid(index_code)
|
||
if pid and is_process_running(pid):
|
||
logger.warning(f"[WARN] HLS downloader for '{index_code}' is already running with PID {pid}")
|
||
return None
|
||
|
||
cmd = [
|
||
sys.executable,
|
||
"hls_downloader.py",
|
||
"--index-code", index_code,
|
||
"--camera-name", camera_name,
|
||
"--camera-id", str(camera_id),
|
||
"--hls-root-path", hls_config.get("hls_root_path", ""),
|
||
"--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)),
|
||
"--retention-days", str(hls_config.get("retention_days", 3)),
|
||
"--retry-interval", str(hls_config.get("retry_interval", 10))
|
||
]
|
||
|
||
logger.info(f"[INFO] Starting HLS downloader for camera '{camera_name}' (index: {index_code})")
|
||
|
||
if DOWNLOADER_DEBUG_MODE:
|
||
process = subprocess.Popen(cmd)
|
||
else:
|
||
process = subprocess.Popen(cmd, start_new_session=True)
|
||
|
||
return process
|
||
|
||
|
||
def start_service():
|
||
"""启动所有服务组"""
|
||
config_path = "config.yaml"
|
||
|
||
# 1. 读取配置
|
||
service_groups = load_service_groups_from_yaml(config_path)
|
||
hls_config = load_hls_config(config_path)
|
||
total_devices = hls_config.get("total_devices", 1)
|
||
logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}")
|
||
logger.info(f"[INFO] HLS config: root_path={hls_config.get('hls_root_path')}, rotate_hour={hls_config.get('daily_rotate_hour')}, total_devices={total_devices}")
|
||
|
||
if not service_groups:
|
||
logger.error("[ERROR] No service groups found in config, exiting...")
|
||
return False
|
||
|
||
# 2. 启动每个服务组
|
||
started_count = 0
|
||
downloader_count = 0
|
||
processes = [] # 记录所有子进程,用于调试模式下等待
|
||
downloader_processes = [] # 记录下载器进程
|
||
|
||
for group in service_groups:
|
||
group_name = group.get("name", "default")
|
||
video_source_type = group.get("video_source_type", "rtsp")
|
||
cameras = group.get("cameras", [])
|
||
|
||
# 检查是否已经在运行
|
||
pid = read_pid(group_name)
|
||
if pid and is_process_running(pid):
|
||
logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}")
|
||
else:
|
||
try:
|
||
# 获取下一个设备ID
|
||
device_id = get_next_device(total_devices) if total_devices > 1 else None
|
||
process, name = start_service_group(group, hls_config, device_id=device_id)
|
||
time.sleep(0.5)
|
||
save_pid(process.pid, name)
|
||
logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}")
|
||
processes.append((process, name))
|
||
started_count += 1
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}")
|
||
|
||
# 如果是HLS类型,为每个摄像头启动下载进程
|
||
if video_source_type == "hls" and hls_config.get("hls_root_path"):
|
||
for camera in cameras:
|
||
index_code = camera.get("index", "")
|
||
if not index_code:
|
||
continue
|
||
try:
|
||
downloader_proc = start_hls_downloader(camera, hls_config)
|
||
if downloader_proc:
|
||
time.sleep(0.3)
|
||
save_hls_downloader_pid(downloader_proc.pid, index_code)
|
||
logger.info(f"[INFO] HLS downloader for '{index_code}' started with PID {downloader_proc.pid}")
|
||
downloader_processes.append((downloader_proc, index_code))
|
||
downloader_count += 1
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to start HLS downloader for '{index_code}': {e}")
|
||
|
||
logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups")
|
||
logger.info(f"[INFO] Started {downloader_count} HLS downloaders")
|
||
|
||
# 根据各自的调试模式决定是否等待子进程
|
||
if DEBUG_MODE and processes:
|
||
logger.info("[DEBUG] Waiting for service processes...")
|
||
for process, name in processes:
|
||
process.wait()
|
||
|
||
if DOWNLOADER_DEBUG_MODE and downloader_processes:
|
||
logger.info("[DEBUG] Waiting for downloader processes...")
|
||
for process, name in downloader_processes:
|
||
process.wait()
|
||
|
||
return started_count > 0
|
||
|
||
|
||
def get_all_hls_downloader_pids() -> List[tuple]:
|
||
"""获取所有HLS下载器的PID文件和index_code"""
|
||
result = []
|
||
pid_pattern = os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}*.pid")
|
||
for pid_file in glob.glob(pid_pattern):
|
||
# 从文件名提取index_code: hls_downloader_{index_code}.pid
|
||
filename = os.path.basename(pid_file)
|
||
index_code = filename[len(HLS_DOWNLOADER_PID_PREFIX):-4] # 去掉前缀和.pid后缀
|
||
result.append((index_code, pid_file))
|
||
return result
|
||
|
||
|
||
def status_service():
|
||
"""检查所有服务组状态"""
|
||
config_path = "config.yaml"
|
||
service_groups = load_service_groups_from_yaml(config_path)
|
||
|
||
if not service_groups:
|
||
logger.info("[INFO] No service groups configured")
|
||
return False
|
||
|
||
running_count = 0
|
||
for group in service_groups:
|
||
group_name = group.get("name", "default")
|
||
pid = read_pid(group_name)
|
||
|
||
if pid and is_process_running(pid):
|
||
logger.info(f"[INFO] Service group '{group_name}' is running with PID {pid}")
|
||
running_count += 1
|
||
else:
|
||
logger.info(f"[INFO] Service group '{group_name}' is not running")
|
||
# 清理无效的PID文件
|
||
if pid:
|
||
try:
|
||
os.remove(get_pid_file_path(group_name))
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running")
|
||
|
||
# 检查HLS下载器状态
|
||
downloader_pids = get_all_hls_downloader_pids()
|
||
downloader_running = 0
|
||
for index_code, pid_file in downloader_pids:
|
||
pid = read_hls_downloader_pid(index_code)
|
||
if pid and is_process_running(pid):
|
||
logger.info(f"[INFO] HLS downloader '{index_code}' is running with PID {pid}")
|
||
downloader_running += 1
|
||
else:
|
||
logger.info(f"[INFO] HLS downloader '{index_code}' is not running")
|
||
if pid:
|
||
try:
|
||
os.remove(pid_file)
|
||
except:
|
||
pass
|
||
|
||
if downloader_pids:
|
||
logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running")
|
||
|
||
return running_count > 0
|
||
|
||
|
||
def stop_service(force=False):
|
||
"""停止所有服务组和HLS下载器"""
|
||
config_path = "config.yaml"
|
||
service_groups = load_service_groups_from_yaml(config_path)
|
||
|
||
if not service_groups:
|
||
logger.info("[INFO] No service groups configured")
|
||
return True
|
||
|
||
stopped_count = 0
|
||
for group in service_groups:
|
||
group_name = group.get("name", "default")
|
||
pid = read_pid(group_name)
|
||
|
||
if not pid:
|
||
logger.info(f"[INFO] Service group '{group_name}' has no PID file")
|
||
continue
|
||
|
||
if not is_process_running(pid):
|
||
logger.warning(f"[WARN] Service group '{group_name}' PID {pid} not running, cleaning up")
|
||
try:
|
||
os.remove(get_pid_file_path(group_name))
|
||
except:
|
||
pass
|
||
stopped_count += 1
|
||
continue
|
||
|
||
try:
|
||
if force:
|
||
logger.info(f"[INFO] Force killing service group '{group_name}' (PID {pid})")
|
||
os.kill(pid, signal.SIGKILL)
|
||
else:
|
||
logger.info(f"[INFO] Stopping service group '{group_name}' (PID {pid})")
|
||
os.kill(pid, signal.SIGTERM)
|
||
|
||
# 等待进程结束
|
||
for i in range(10):
|
||
if not is_process_running(pid):
|
||
break
|
||
time.sleep(1)
|
||
|
||
if is_process_running(pid):
|
||
logger.warning(f"[WARN] Force killing service group '{group_name}'")
|
||
os.kill(pid, signal.SIGKILL)
|
||
time.sleep(1)
|
||
|
||
# 清理PID文件
|
||
try:
|
||
os.remove(get_pid_file_path(group_name))
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"[INFO] Service group '{group_name}' stopped")
|
||
stopped_count += 1
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}")
|
||
|
||
logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups")
|
||
|
||
# 停止所有HLS下载器
|
||
downloader_pids = get_all_hls_downloader_pids()
|
||
downloader_stopped = 0
|
||
for index_code, pid_file in downloader_pids:
|
||
pid = read_hls_downloader_pid(index_code)
|
||
|
||
if not pid:
|
||
continue
|
||
|
||
if not is_process_running(pid):
|
||
logger.warning(f"[WARN] HLS downloader '{index_code}' PID {pid} not running, cleaning up")
|
||
try:
|
||
os.remove(pid_file)
|
||
except:
|
||
pass
|
||
downloader_stopped += 1
|
||
continue
|
||
|
||
try:
|
||
if force:
|
||
logger.info(f"[INFO] Force killing HLS downloader '{index_code}' (PID {pid})")
|
||
os.kill(pid, signal.SIGKILL)
|
||
else:
|
||
logger.info(f"[INFO] Stopping HLS downloader '{index_code}' (PID {pid})")
|
||
os.kill(pid, signal.SIGTERM)
|
||
|
||
# 等待进程结束
|
||
for i in range(10):
|
||
if not is_process_running(pid):
|
||
break
|
||
time.sleep(1)
|
||
|
||
if is_process_running(pid):
|
||
logger.warning(f"[WARN] Force killing HLS downloader '{index_code}'")
|
||
os.kill(pid, signal.SIGKILL)
|
||
time.sleep(1)
|
||
|
||
# 清理PID文件
|
||
try:
|
||
os.remove(pid_file)
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"[INFO] HLS downloader '{index_code}' stopped")
|
||
downloader_stopped += 1
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to stop HLS downloader '{index_code}': {e}")
|
||
|
||
if downloader_pids:
|
||
logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders")
|
||
|
||
return True
|
||
|
||
|
||
def restart_service():
|
||
"""重启所有服务组"""
|
||
logger.info("[INFO] Restarting all service groups...")
|
||
stop_service()
|
||
time.sleep(2)
|
||
return start_service()
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="RTSP Service Manager")
|
||
parser.add_argument("command", nargs="?", choices=["start", "stop", "restart", "status"], default="start",
|
||
help="Command to execute: start, stop, restart, status (default: start)")
|
||
parser.add_argument("--force", action="store_true",
|
||
help="Force stop (send SIGKILL immediately)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.command == "start":
|
||
success = start_service()
|
||
sys.exit(0 if success else 1)
|
||
elif args.command == "stop":
|
||
success = stop_service(force=args.force)
|
||
sys.exit(0 if success else 1)
|
||
elif args.command == "restart":
|
||
success = restart_service()
|
||
sys.exit(0 if success else 1)
|
||
elif args.command == "status":
|
||
status_service()
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|