diff --git a/config.yaml b/config.yaml index 420fd8d..d5d5f55 100644 --- a/config.yaml +++ b/config.yaml @@ -30,13 +30,19 @@ database: pool_recycle: 3600 echo: false -cameras: -- id: 1 - index: testindexcode - name: Entrance - params: - roi_points: - - [0.15, 0.001] - - [0.5, 0.001] - - [1.0, 0.8] - - [0.35, 1.0] +# 服务组配置:每个组有独立的 WebSocket 和算法类型 +service_groups: +- name: "kadian_group" # 服务组名称 + ws_host: "0.0.0.0" # WebSocket 服务地址 + ws_port: 8765 # WebSocket 服务端口 + algorithm: "kadian" # 算法类型 + cameras: # 该组下的摄像头列表 + - id: 1 + index: testindexcode + name: Entrance + params: + roi_points: + - [0.15, 0.001] + - [0.5, 0.001] + - [1.0, 0.8] + - [0.35, 1.0] diff --git a/main_start.py b/main_start.py index d9ecd9c..2a192e1 100644 --- a/main_start.py +++ b/main_start.py @@ -11,22 +11,22 @@ import os import signal import argparse import time -from typing import List +from typing import List, Dict from common.camera_config import CameraConfig from utils.logger import get_logger logger = get_logger(__name__) -# PID 文件路径 -PID_FILE = "rtsp_service.pid" +# PID 文件目录 +PID_DIR = "pids" -def load_cameras_from_yaml(config_path: str = "config.yaml") -> List[dict]: - """从 YAML 文件加载摄像头配置""" +def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict]: + """从 YAML 文件加载服务组配置""" with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) - return cfg.get("cameras", []) + return cfg.get("service_groups", []) def cameras_to_base64_json(cameras: List[dict]) -> str: @@ -35,39 +35,36 @@ def cameras_to_base64_json(cameras: List[dict]) -> str: return base64.b64encode(json_str.encode('utf-8')).decode('ascii') -def start_rtsp_service(cameras_base64: str, script_path: str = "rtsp_service_ws_kadian.py"): - """启动 RTSP 服务子进程(后台运行)""" - cmd = [ - sys.executable, # 当前 Python 解释器路径 - script_path, - "--cameras", cameras_base64 - ] - - logger.info(f"[INFO] Starting RTSP service with command: python {script_path} --cameras ") - - # 使用 start_new_session=True 创建新会话,类似 nohup 效果 - process = subprocess.Popen(cmd, start_new_session=True) - - return process +def get_pid_file_path(group_name: str) -> str: + """获取服务组的 PID 文件路径""" + return os.path.join(PID_DIR, f"{group_name}.pid") -def save_pid(pid: int): +def ensure_pid_dir(): + """确保 PID 目录存在""" + if not os.path.exists(PID_DIR): + os.makedirs(PID_DIR) + + +def save_pid(pid: int, group_name: str): """保存进程ID到文件""" + ensure_pid_dir() + pid_file = get_pid_file_path(group_name) try: - with open(PID_FILE, "w") as f: + with open(pid_file, "w") as f: f.write(str(pid)) - logger.info(f"[INFO] Saved PID {pid} to {PID_FILE}") + logger.info(f"[INFO] Saved PID {pid} to {pid_file}") except Exception as e: logger.error(f"[ERROR] Failed to save PID file: {e}") -def read_pid(): +def read_pid(group_name: str): """从PID文件读取进程ID""" + pid_file = get_pid_file_path(group_name) try: - with open(PID_FILE, "r") as f: + with open(pid_file, "r") as f: return int(f.read().strip()) except FileNotFoundError: - logger.warning(f"[WARN] PID file {PID_FILE} not found") return None except Exception as e: logger.error(f"[ERROR] Failed to read PID file: {e}") @@ -77,127 +74,169 @@ def read_pid(): def is_process_running(pid: int): """检查进程是否在运行""" try: - # 发送信号0,不实际发送信号,仅检查进程是否存在 os.kill(pid, 0) return True except OSError: return False -def start_service(): - """启动服务""" - # 检查是否已经在运行 - pid = read_pid() - if pid and is_process_running(pid): - logger.warning(f"[WARN] Service is already running with PID {pid}") - return False +def start_rtsp_service(group: dict, script_path: str = "rtsp_service_ws_kadian.py"): + """启动 RTSP 服务子进程(后台运行)""" + cameras = group.get("cameras", []) + ws_host = group.get("ws_host", "0.0.0.0") + ws_port = group.get("ws_port", 8765) + algorithm = group.get("algorithm", "") + group_name = group.get("name", "default") + cameras_base64 = cameras_to_base64_json(cameras) + + cmd = [ + sys.executable, + script_path, + "--cameras", cameras_base64, + "--ws-host", ws_host, + "--ws-port", str(ws_port), + "--algorithm", algorithm + ] + + logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}") + + process = subprocess.Popen(cmd, start_new_session=True) + return process, group_name + + +def start_service(): + """启动所有服务组""" config_path = "config.yaml" # 1. 读取配置 - cameras_data = load_cameras_from_yaml(config_path) - logger.info(f"[INFO] Loaded {len(cameras_data)} cameras from {config_path}") + service_groups = load_service_groups_from_yaml(config_path) + logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}") - if not cameras_data: - logger.error("[ERROR] No cameras found in config, exiting...") + if not service_groups: + logger.error("[ERROR] No service groups found in config, exiting...") return False - # 2. 转换为 base64 JSON - cameras_base64 = cameras_to_base64_json(cameras_data) + # 2. 启动每个服务组 + started_count = 0 + for group in service_groups: + group_name = group.get("name", "default") + + # 检查是否已经在运行 + pid = read_pid(group_name) + if pid and is_process_running(pid): + logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}") + continue + + try: + process, name = start_rtsp_service(group) + time.sleep(0.5) + save_pid(process.pid, name) + logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}") + started_count += 1 + except Exception as e: + logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}") - # 3. 启动子进程 - try: - process = start_rtsp_service(cameras_base64) - # 等待一下确保进程启动 - time.sleep(1) - - # 4. 保存PID - save_pid(process.pid) - - logger.info(f"[INFO] Service started successfully with PID {process.pid}") - logger.info("[INFO] Main process exiting, service will continue running in background") - return True - except Exception as e: - logger.error(f"[ERROR] Failed to start service: {e}") - return False + logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups") + return started_count > 0 def status_service(): - """检查服务状态""" - pid = read_pid() - if not pid: - logger.info("[INFO] Service is not running (no PID file)") + """检查所有服务组状态""" + config_path = "config.yaml" + service_groups = load_service_groups_from_yaml(config_path) + + if not service_groups: + logger.info("[INFO] No service groups configured") return False - if is_process_running(pid): - logger.info(f"[INFO] Service is running with PID {pid}") - return True - else: - logger.info(f"[INFO] Service is not running (PID {pid} not found), cleaning up PID file") - try: - os.remove(PID_FILE) - except: - pass - return False + running_count = 0 + for group in service_groups: + group_name = group.get("name", "default") + pid = read_pid(group_name) + + if pid and is_process_running(pid): + logger.info(f"[INFO] Service group '{group_name}' is running with PID {pid}") + running_count += 1 + else: + logger.info(f"[INFO] Service group '{group_name}' is not running") + # 清理无效的PID文件 + if pid: + try: + os.remove(get_pid_file_path(group_name)) + except: + pass + + logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running") + return running_count > 0 def stop_service(force=False): - """停止服务""" - pid = read_pid() - if not pid: - logger.error("[ERROR] No PID file found, service may not be running") - return False + """停止所有服务组""" + config_path = "config.yaml" + service_groups = load_service_groups_from_yaml(config_path) - if not is_process_running(pid): - logger.warning(f"[WARN] Process with PID {pid} is not running, cleaning up PID file") + if not service_groups: + logger.info("[INFO] No service groups configured") + return True + + stopped_count = 0 + for group in service_groups: + group_name = group.get("name", "default") + pid = read_pid(group_name) + + if not pid: + logger.info(f"[INFO] Service group '{group_name}' has no PID file") + continue + + if not is_process_running(pid): + logger.warning(f"[WARN] Service group '{group_name}' PID {pid} not running, cleaning up") + try: + os.remove(get_pid_file_path(group_name)) + except: + pass + stopped_count += 1 + continue + try: - os.remove(PID_FILE) - except: - pass - return True + if force: + logger.info(f"[INFO] Force killing service group '{group_name}' (PID {pid})") + os.kill(pid, signal.SIGKILL) + else: + logger.info(f"[INFO] Stopping service group '{group_name}' (PID {pid})") + os.kill(pid, signal.SIGTERM) + + # 等待进程结束 + for i in range(10): + if not is_process_running(pid): + break + time.sleep(1) + + if is_process_running(pid): + logger.warning(f"[WARN] Force killing service group '{group_name}'") + os.kill(pid, signal.SIGKILL) + time.sleep(1) + + # 清理PID文件 + try: + os.remove(get_pid_file_path(group_name)) + except: + pass + + logger.info(f"[INFO] Service group '{group_name}' stopped") + stopped_count += 1 + except Exception as e: + logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}") - # 发送终止信号 - try: - if force: - logger.info(f"[INFO] Force killing process {pid} with SIGKILL") - os.kill(pid, signal.SIGKILL) - else: - logger.info(f"[INFO] Gracefully stopping process {pid} with SIGTERM") - os.kill(pid, signal.SIGTERM) - - # 等待进程结束(最多10秒) - for i in range(10): - if not is_process_running(pid): - break - time.sleep(1) - - # 如果进程还在,强制杀死 - if is_process_running(pid): - logger.warning(f"[WARN] Process {pid} still running after SIGTERM, sending SIGKILL") - os.kill(pid, signal.SIGKILL) - time.sleep(1) - - # 清理PID文件 - if os.path.exists(PID_FILE): - os.remove(PID_FILE) - - logger.info(f"[INFO] Service stopped successfully (PID: {pid})") - return True - except Exception as e: - logger.error(f"[ERROR] Failed to stop service: {e}") - return False + logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups") + return True def restart_service(): - """重启服务""" - logger.info("[INFO] Restarting service...") - - # 先停止 - if status_service(): - stop_service() - time.sleep(2) # 等待进程完全停止 - - # 再启动 + """重启所有服务组""" + logger.info("[INFO] Restarting all service groups...") + stop_service() + time.sleep(2) return start_service() @@ -220,8 +259,8 @@ def main(): success = restart_service() sys.exit(0 if success else 1) elif args.command == "status": - success = status_service() - sys.exit(0 if success else 0) # 状态检查总是返回0退出码 + status_service() + sys.exit(0) if __name__ == "__main__": diff --git a/rtsp_service_ws_kadian.py b/rtsp_service_ws_kadian.py index 4370a94..f258b47 100644 --- a/rtsp_service_ws_kadian.py +++ b/rtsp_service_ws_kadian.py @@ -25,9 +25,6 @@ from utils.web_socket_sender import WebSocketSender from utils.logger import get_logger logger = get_logger(__name__) -WS_HOST = "0.0.0.0" -WS_PORT = 8765 - # ========================= RTSP 抓流线程 ========================= class RTSPCaptureWorker(threading.Thread): @@ -175,8 +172,11 @@ class RTSPCaptureWorker(threading.Thread): # ========================= 服务主类 ========================= class RTSPService: - def __init__(self, cameras: list[CameraConfig]): + def __init__(self, cameras: list[CameraConfig], ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""): self.cameras = cameras + self.ws_host = ws_host + self.ws_port = ws_port + self.algorithm = algorithm self.stop_event = threading.Event() self.raw_queue = queue.Queue(maxsize=2) @@ -184,7 +184,7 @@ class RTSPService: self.capture_workers = [] self.processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event, self.cameras) - self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT) + self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port) def start(self): self.ws_sender.start() @@ -193,7 +193,7 @@ class RTSPService: w = RTSPCaptureWorker(cam, self.raw_queue, self.stop_event) w.start() self.capture_workers.append(w) - logger.info("[INFO] Kadian RTSP Service started") + logger.info(f"[INFO] RTSP Service started (algorithm={self.algorithm}, ws={self.ws_host}:{self.ws_port})") def stop(self): self.stop_event.set() @@ -241,9 +241,12 @@ def parse_cameras_from_yaml(yaml_path: str) -> list[CameraConfig]: if __name__ == "__main__": - parser = argparse.ArgumentParser(description="RTSP Service for Kadian Detection") + parser = argparse.ArgumentParser(description="RTSP Service for Detection") parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)") parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file") + parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host") + parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port") + parser.add_argument("--algorithm", type=str, default="", help="Algorithm type") args = parser.parse_args() # 优先使用命令行传入的 cameras JSON,否则读取配置文件 @@ -258,7 +261,7 @@ if __name__ == "__main__": logger.error("[ERROR] No cameras configured, exiting...") sys.exit(1) - service = RTSPService(cameras) + service = RTSPService(cameras, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm) service.start() try: while True: