From 74f555f38c273c33c6538d7d7c4b343ed3b288d8 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Mon, 9 Mar 2026 12:17:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B8=85=E7=90=86=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E9=80=BB=E8=BE=91=EF=BC=8C=E6=94=B9=E5=88=B0main=5Fst?= =?UTF-8?q?art.py=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hls_cleanup.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++ hls_downloader.py | 46 ++------------- main_start.py | 134 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 277 insertions(+), 42 deletions(-) create mode 100644 hls_cleanup.py diff --git a/hls_cleanup.py b/hls_cleanup.py new file mode 100644 index 0000000..6c611e1 --- /dev/null +++ b/hls_cleanup.py @@ -0,0 +1,139 @@ +# hls_cleanup.py +# HLS文件夹清理进程 +# 功能:定时清理过期的HLS会话文件夹,支持浮点数天数,不依赖摄像头配置 + +import os +import sys +import argparse +import signal +import shutil +import time +from datetime import datetime, timedelta +from typing import List + +from utils.logger import get_logger +logger = get_logger(__name__) + +running = True + + +def parse_args(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description="HLS Folder Cleanup Service") + parser.add_argument("--hls-root-path", required=True, help="HLS root path") + parser.add_argument("--retention-days", required=True, type=float, + help="Retention days (supports float, e.g. 0.5 for 12 hours)") + parser.add_argument("--interval", required=True, type=int, + help="Cleanup interval in hours") + return parser.parse_args() + + +def parse_folder_time(folder_name: str) -> datetime: + """ + 从文件夹名称解析时间 + 格式: yyyyMMdd_HHmmss + """ + try: + date_str, time_str = folder_name.split("_") + return datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M%S") + except (ValueError, IndexError) as e: + raise ValueError(f"Invalid folder name format: {folder_name}") from e + + +def cleanup_hls_folders(hls_root_path: str, retention_days: float) -> tuple: + """ + 清理过期的HLS会话文件夹 + + Args: + hls_root_path: HLS根目录 + retention_days: 保留天数(支持浮点数) + + Returns: + (deleted_count, error_count) 删除数量和错误数量 + """ + if not os.path.exists(hls_root_path): + logger.info(f"HLS root path does not exist: {hls_root_path}") + return 0, 0 + + cutoff_time = datetime.now() - timedelta(days=retention_days) + deleted_count = 0 + error_count = 0 + + # 遍历所有摄像头目录 + for camera_folder in os.listdir(hls_root_path): + camera_path = os.path.join(hls_root_path, camera_folder) + + if not os.path.isdir(camera_path): + continue + + # 遍历该摄像头下的所有会话文件夹 + for session_folder in os.listdir(camera_path): + session_path = os.path.join(camera_path, session_folder) + + if not os.path.isdir(session_path): + continue + + try: + # 解析文件夹名称获取时间 + folder_time = parse_folder_time(session_folder) + + if folder_time < cutoff_time: + shutil.rmtree(session_path) + logger.info(f"Deleted: {session_path} (created: {folder_time})") + deleted_count += 1 + except ValueError as e: + # 文件夹名称格式不匹配,跳过 + logger.debug(f"Skipped: {session_path} ({e})") + except Exception as e: + logger.warning(f"Failed to delete {session_path}: {e}") + error_count += 1 + + return deleted_count, error_count + + +def signal_handler(signum, frame): + """信号处理器""" + global running + logger.info(f"Received signal {signum}, shutting down...") + running = False + + +def main(): + global running + + args = parse_args() + + # 注册信号处理器 + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + logger.info(f"HLS Cleanup Service starting") + logger.info(f"Config: hls_root={args.hls_root_path}, " + f"retention_days={args.retention_days}, interval={args.interval}h") + + # 主循环 + while running: + try: + logger.info(f"Starting cleanup scan...") + deleted, errors = cleanup_hls_folders(args.hls_root_path, args.retention_days) + + if deleted > 0 or errors > 0: + logger.info(f"Cleanup completed: {deleted} deleted, {errors} errors " + f"(retention: {args.retention_days} days)") + else: + logger.info(f"Cleanup scan completed, no folders to delete " + f"(retention: {args.retention_days} days)") + except Exception as e: + logger.error(f"Error during cleanup: {e}") + + # 等待下一次轮询(每小时检查一次) + for _ in range(args.interval * 3600): + if not running: + break + time.sleep(1) + + logger.info("HLS Cleanup Service stopped") + + +if __name__ == "__main__": + main() diff --git a/hls_downloader.py b/hls_downloader.py index b5f7c65..7736fd5 100644 --- a/hls_downloader.py +++ b/hls_downloader.py @@ -1,6 +1,6 @@ # hls_downloader.py # HLS分片TS下载管理进程 -# 功能:管理ffmpeg下载HLS直播流,支持异常重试、定时轮换、文件清理 +# 功能:管理ffmpeg下载HLS直播流,支持异常重试、定时轮换 import os import sys @@ -8,7 +8,6 @@ import json import argparse import signal import subprocess -import shutil import time from datetime import datetime, timedelta from typing import Optional @@ -35,7 +34,6 @@ def parse_args(): parser.add_argument("--camera-id", required=True, type=int, help="Camera ID") parser.add_argument("--hls-root-path", required=True, help="HLS root path") parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)") - parser.add_argument("--retention-days", required=True, type=int, help="File retention days") parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds") return parser.parse_args() @@ -152,37 +150,6 @@ def stop_ffmpeg(): ffmpeg_process = None -def cleanup_old_folders(hls_root_path: str, index_code: str, retention_days: int): - """清理过期的下载文件夹""" - try: - index_folder = os.path.join(hls_root_path, index_code) - if not os.path.exists(index_folder): - return - - cutoff_date = datetime.now() - timedelta(days=retention_days) - cutoff_str = cutoff_date.strftime("%Y%m%d") - - deleted_count = 0 - for folder_name in os.listdir(index_folder): - folder_path = os.path.join(index_folder, folder_name) - if os.path.isdir(folder_path): - # 从文件夹名称解析日期 (yyyyMMdd_HHmmss) - try: - date_str = folder_name.split("_")[0] - if date_str < cutoff_str: - shutil.rmtree(folder_path) - logger.info(f"Deleted old folder: {folder_path}") - deleted_count += 1 - except Exception as e: - logger.warning(f"Failed to parse folder date {folder_name}: {e}") - - if deleted_count > 0: - logger.info(f"Cleaned up {deleted_count} old folders") - - except Exception as e: - logger.error(f"Error cleaning up old folders: {e}") - - def get_next_rotate_time(rotate_hour: int) -> datetime: """计算下次轮换时间""" now = datetime.now() @@ -243,7 +210,7 @@ def main(): logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})") logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, " - f"retention_days={args.retention_days}, retry_interval={args.retry_interval}") + f"retry_interval={args.retry_interval}") # 计算下次轮换时间 next_rotate_time = get_next_rotate_time(args.rotate_hour) @@ -284,17 +251,14 @@ def main(): now = datetime.now() if now >= next_rotate_time: logger.info("Daily rotation triggered") - + # 停止当前ffmpeg stop_ffmpeg() - - # 清理旧文件 - cleanup_old_folders(args.hls_root_path, args.index_code, args.retention_days) - + # 计算下次轮换时间 next_rotate_time = get_next_rotate_time(args.rotate_hour) logger.info(f"Next rotation scheduled at: {next_rotate_time}") - + # 继续循环,会重新启动ffmpeg time.sleep(1) continue diff --git a/main_start.py b/main_start.py index afc0956..ffc4788 100644 --- a/main_start.py +++ b/main_start.py @@ -25,6 +25,9 @@ PID_DIR = "pids" # HLS下载器PID文件前缀 HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_" +# HLS清理进程PID文件 +HLS_CLEANUP_PID_FILE = "hls_cleanup.pid" + def load_debug_mode(config_path: str = "config.yaml") -> bool: """从配置文件读取调试模式""" @@ -164,6 +167,36 @@ def read_hls_downloader_pid(index_code: str): return None +def get_hls_cleanup_pid_file_path() -> str: + """获取HLS清理进程的 PID 文件路径""" + return os.path.join(PID_DIR, HLS_CLEANUP_PID_FILE) + + +def save_hls_cleanup_pid(pid: int): + """保存HLS清理进程ID到文件""" + ensure_pid_dir() + pid_file = get_hls_cleanup_pid_file_path() + try: + with open(pid_file, "w") as f: + f.write(str(pid)) + logger.info(f"[INFO] Saved HLS cleanup PID {pid} to {pid_file}") + except Exception as e: + logger.error(f"[ERROR] Failed to save HLS cleanup PID file: {e}") + + +def read_hls_cleanup_pid(): + """从PID文件读取HLS清理进程ID""" + pid_file = get_hls_cleanup_pid_file_path() + try: + with open(pid_file, "r") as f: + return int(f.read().strip()) + except FileNotFoundError: + return None + except Exception as e: + logger.error(f"[ERROR] Failed to read HLS cleanup PID file: {e}") + return None + + def is_process_running(pid: int): """检查进程是否在运行""" try: @@ -248,7 +281,6 @@ def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess. "--camera-id", str(camera_id), "--hls-root-path", hls_config.get("hls_root_path", ""), "--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)), - "--retention-days", str(hls_config.get("retention_days", 3)), "--retry-interval", str(hls_config.get("retry_interval", 10)) ] @@ -262,6 +294,39 @@ def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess. return process +def start_hls_cleanup(hls_config: dict) -> Optional[subprocess.Popen]: + """启动HLS清理进程""" + hls_root_path = hls_config.get("hls_root_path", "") + retention_days = hls_config.get("retention_days", 3) + + if not hls_root_path: + logger.warning("[WARN] HLS root path not configured, skipping HLS cleanup") + return None + + # 检查是否已经在运行 + pid = read_hls_cleanup_pid() + if pid and is_process_running(pid): + logger.warning(f"[WARN] HLS cleanup is already running with PID {pid}") + return None + + cmd = [ + sys.executable, + "hls_cleanup.py", + "--hls-root-path", hls_root_path, + "--retention-days", str(retention_days), + "--interval", "1" # 每小时清理一次 + ] + + logger.info(f"[INFO] Starting HLS cleanup service (retention: {retention_days} days)") + + if DOWNLOADER_DEBUG_MODE: + process = subprocess.Popen(cmd) + else: + process = subprocess.Popen(cmd, start_new_session=True) + + return process + + def start_service(): """启动所有服务组""" config_path = "config.yaml" @@ -325,6 +390,18 @@ def start_service(): logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups") logger.info(f"[INFO] Started {downloader_count} HLS downloaders") + # 启动HLS清理进程 + cleanup_process = None + if hls_config.get("hls_root_path"): + try: + cleanup_process = start_hls_cleanup(hls_config) + if cleanup_process: + time.sleep(0.3) + save_hls_cleanup_pid(cleanup_process.pid) + logger.info(f"[INFO] HLS cleanup service started with PID {cleanup_process.pid}") + except Exception as e: + logger.error(f"[ERROR] Failed to start HLS cleanup service: {e}") + # 根据各自的调试模式决定是否等待子进程 if DEBUG_MODE and processes: logger.info("[DEBUG] Waiting for service processes...") @@ -336,6 +413,10 @@ def start_service(): for process, name in downloader_processes: process.wait() + if DOWNLOADER_DEBUG_MODE and cleanup_process: + logger.info("[DEBUG] Waiting for cleanup process...") + cleanup_process.wait() + return started_count > 0 @@ -398,6 +479,18 @@ def status_service(): if downloader_pids: logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running") + # 检查HLS清理进程状态 + cleanup_pid = read_hls_cleanup_pid() + if cleanup_pid and is_process_running(cleanup_pid): + logger.info(f"[INFO] HLS cleanup service is running with PID {cleanup_pid}") + else: + logger.info(f"[INFO] HLS cleanup service is not running") + if cleanup_pid: + try: + os.remove(get_hls_cleanup_pid_file_path()) + except: + pass + return running_count > 0 @@ -511,6 +604,45 @@ def stop_service(force=False): if downloader_pids: logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders") + # 停止HLS清理进程 + cleanup_pid = read_hls_cleanup_pid() + if cleanup_pid: + if not is_process_running(cleanup_pid): + logger.warning(f"[WARN] HLS cleanup PID {cleanup_pid} not running, cleaning up") + try: + os.remove(get_hls_cleanup_pid_file_path()) + except: + pass + else: + try: + if force: + logger.info(f"[INFO] Force killing HLS cleanup (PID {cleanup_pid})") + os.kill(cleanup_pid, signal.SIGKILL) + else: + logger.info(f"[INFO] Stopping HLS cleanup (PID {cleanup_pid})") + os.kill(cleanup_pid, signal.SIGTERM) + + # 等待进程结束 + for i in range(10): + if not is_process_running(cleanup_pid): + break + time.sleep(1) + + if is_process_running(cleanup_pid): + logger.warning("[WARN] Force killing HLS cleanup") + os.kill(cleanup_pid, signal.SIGKILL) + time.sleep(1) + + # 清理PID文件 + try: + os.remove(get_hls_cleanup_pid_file_path()) + except: + pass + + logger.info("[INFO] HLS cleanup stopped") + except Exception as e: + logger.error(f"[ERROR] Failed to stop HLS cleanup: {e}") + return True