修改清理文件逻辑,改到main_start.py中

This commit is contained in:
zqc
2026-03-09 12:17:01 +08:00
parent cbf06ddc22
commit 74f555f38c
3 changed files with 277 additions and 42 deletions

139
hls_cleanup.py Normal file
View File

@@ -0,0 +1,139 @@
# hls_cleanup.py
# HLS文件夹清理进程
# 功能定时清理过期的HLS会话文件夹支持浮点数天数不依赖摄像头配置
import os
import sys
import argparse
import signal
import shutil
import time
from datetime import datetime, timedelta
from typing import List
from utils.logger import get_logger
logger = get_logger(__name__)
running = True
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="HLS Folder Cleanup Service")
parser.add_argument("--hls-root-path", required=True, help="HLS root path")
parser.add_argument("--retention-days", required=True, type=float,
help="Retention days (supports float, e.g. 0.5 for 12 hours)")
parser.add_argument("--interval", required=True, type=int,
help="Cleanup interval in hours")
return parser.parse_args()
def parse_folder_time(folder_name: str) -> datetime:
"""
从文件夹名称解析时间
格式: yyyyMMdd_HHmmss
"""
try:
date_str, time_str = folder_name.split("_")
return datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M%S")
except (ValueError, IndexError) as e:
raise ValueError(f"Invalid folder name format: {folder_name}") from e
def cleanup_hls_folders(hls_root_path: str, retention_days: float) -> tuple:
"""
清理过期的HLS会话文件夹
Args:
hls_root_path: HLS根目录
retention_days: 保留天数(支持浮点数)
Returns:
(deleted_count, error_count) 删除数量和错误数量
"""
if not os.path.exists(hls_root_path):
logger.info(f"HLS root path does not exist: {hls_root_path}")
return 0, 0
cutoff_time = datetime.now() - timedelta(days=retention_days)
deleted_count = 0
error_count = 0
# 遍历所有摄像头目录
for camera_folder in os.listdir(hls_root_path):
camera_path = os.path.join(hls_root_path, camera_folder)
if not os.path.isdir(camera_path):
continue
# 遍历该摄像头下的所有会话文件夹
for session_folder in os.listdir(camera_path):
session_path = os.path.join(camera_path, session_folder)
if not os.path.isdir(session_path):
continue
try:
# 解析文件夹名称获取时间
folder_time = parse_folder_time(session_folder)
if folder_time < cutoff_time:
shutil.rmtree(session_path)
logger.info(f"Deleted: {session_path} (created: {folder_time})")
deleted_count += 1
except ValueError as e:
# 文件夹名称格式不匹配,跳过
logger.debug(f"Skipped: {session_path} ({e})")
except Exception as e:
logger.warning(f"Failed to delete {session_path}: {e}")
error_count += 1
return deleted_count, error_count
def signal_handler(signum, frame):
"""信号处理器"""
global running
logger.info(f"Received signal {signum}, shutting down...")
running = False
def main():
global running
args = parse_args()
# 注册信号处理器
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.info(f"HLS Cleanup Service starting")
logger.info(f"Config: hls_root={args.hls_root_path}, "
f"retention_days={args.retention_days}, interval={args.interval}h")
# 主循环
while running:
try:
logger.info(f"Starting cleanup scan...")
deleted, errors = cleanup_hls_folders(args.hls_root_path, args.retention_days)
if deleted > 0 or errors > 0:
logger.info(f"Cleanup completed: {deleted} deleted, {errors} errors "
f"(retention: {args.retention_days} days)")
else:
logger.info(f"Cleanup scan completed, no folders to delete "
f"(retention: {args.retention_days} days)")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
# 等待下一次轮询(每小时检查一次)
for _ in range(args.interval * 3600):
if not running:
break
time.sleep(1)
logger.info("HLS Cleanup Service stopped")
if __name__ == "__main__":
main()

View File

@@ -1,6 +1,6 @@
# hls_downloader.py # hls_downloader.py
# HLS分片TS下载管理进程 # HLS分片TS下载管理进程
# 功能管理ffmpeg下载HLS直播流支持异常重试、定时轮换、文件清理 # 功能管理ffmpeg下载HLS直播流支持异常重试、定时轮换
import os import os
import sys import sys
@@ -8,7 +8,6 @@ import json
import argparse import argparse
import signal import signal
import subprocess import subprocess
import shutil
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional from typing import Optional
@@ -35,7 +34,6 @@ def parse_args():
parser.add_argument("--camera-id", required=True, type=int, help="Camera ID") parser.add_argument("--camera-id", required=True, type=int, help="Camera ID")
parser.add_argument("--hls-root-path", required=True, help="HLS root path") parser.add_argument("--hls-root-path", required=True, help="HLS root path")
parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)") parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)")
parser.add_argument("--retention-days", required=True, type=int, help="File retention days")
parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds") parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds")
return parser.parse_args() return parser.parse_args()
@@ -152,37 +150,6 @@ def stop_ffmpeg():
ffmpeg_process = None ffmpeg_process = None
def cleanup_old_folders(hls_root_path: str, index_code: str, retention_days: int):
"""清理过期的下载文件夹"""
try:
index_folder = os.path.join(hls_root_path, index_code)
if not os.path.exists(index_folder):
return
cutoff_date = datetime.now() - timedelta(days=retention_days)
cutoff_str = cutoff_date.strftime("%Y%m%d")
deleted_count = 0
for folder_name in os.listdir(index_folder):
folder_path = os.path.join(index_folder, folder_name)
if os.path.isdir(folder_path):
# 从文件夹名称解析日期 (yyyyMMdd_HHmmss)
try:
date_str = folder_name.split("_")[0]
if date_str < cutoff_str:
shutil.rmtree(folder_path)
logger.info(f"Deleted old folder: {folder_path}")
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to parse folder date {folder_name}: {e}")
if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old folders")
except Exception as e:
logger.error(f"Error cleaning up old folders: {e}")
def get_next_rotate_time(rotate_hour: int) -> datetime: def get_next_rotate_time(rotate_hour: int) -> datetime:
"""计算下次轮换时间""" """计算下次轮换时间"""
now = datetime.now() now = datetime.now()
@@ -243,7 +210,7 @@ def main():
logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})") logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})")
logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, " logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, "
f"retention_days={args.retention_days}, retry_interval={args.retry_interval}") f"retry_interval={args.retry_interval}")
# 计算下次轮换时间 # 计算下次轮换时间
next_rotate_time = get_next_rotate_time(args.rotate_hour) next_rotate_time = get_next_rotate_time(args.rotate_hour)
@@ -288,9 +255,6 @@ def main():
# 停止当前ffmpeg # 停止当前ffmpeg
stop_ffmpeg() stop_ffmpeg()
# 清理旧文件
cleanup_old_folders(args.hls_root_path, args.index_code, args.retention_days)
# 计算下次轮换时间 # 计算下次轮换时间
next_rotate_time = get_next_rotate_time(args.rotate_hour) next_rotate_time = get_next_rotate_time(args.rotate_hour)
logger.info(f"Next rotation scheduled at: {next_rotate_time}") logger.info(f"Next rotation scheduled at: {next_rotate_time}")

View File

@@ -25,6 +25,9 @@ PID_DIR = "pids"
# HLS下载器PID文件前缀 # HLS下载器PID文件前缀
HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_" HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_"
# HLS清理进程PID文件
HLS_CLEANUP_PID_FILE = "hls_cleanup.pid"
def load_debug_mode(config_path: str = "config.yaml") -> bool: def load_debug_mode(config_path: str = "config.yaml") -> bool:
"""从配置文件读取调试模式""" """从配置文件读取调试模式"""
@@ -164,6 +167,36 @@ def read_hls_downloader_pid(index_code: str):
return None return None
def get_hls_cleanup_pid_file_path() -> str:
"""获取HLS清理进程的 PID 文件路径"""
return os.path.join(PID_DIR, HLS_CLEANUP_PID_FILE)
def save_hls_cleanup_pid(pid: int):
"""保存HLS清理进程ID到文件"""
ensure_pid_dir()
pid_file = get_hls_cleanup_pid_file_path()
try:
with open(pid_file, "w") as f:
f.write(str(pid))
logger.info(f"[INFO] Saved HLS cleanup PID {pid} to {pid_file}")
except Exception as e:
logger.error(f"[ERROR] Failed to save HLS cleanup PID file: {e}")
def read_hls_cleanup_pid():
"""从PID文件读取HLS清理进程ID"""
pid_file = get_hls_cleanup_pid_file_path()
try:
with open(pid_file, "r") as f:
return int(f.read().strip())
except FileNotFoundError:
return None
except Exception as e:
logger.error(f"[ERROR] Failed to read HLS cleanup PID file: {e}")
return None
def is_process_running(pid: int): def is_process_running(pid: int):
"""检查进程是否在运行""" """检查进程是否在运行"""
try: try:
@@ -248,7 +281,6 @@ def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess.
"--camera-id", str(camera_id), "--camera-id", str(camera_id),
"--hls-root-path", hls_config.get("hls_root_path", ""), "--hls-root-path", hls_config.get("hls_root_path", ""),
"--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)), "--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)),
"--retention-days", str(hls_config.get("retention_days", 3)),
"--retry-interval", str(hls_config.get("retry_interval", 10)) "--retry-interval", str(hls_config.get("retry_interval", 10))
] ]
@@ -262,6 +294,39 @@ def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess.
return process return process
def start_hls_cleanup(hls_config: dict) -> Optional[subprocess.Popen]:
"""启动HLS清理进程"""
hls_root_path = hls_config.get("hls_root_path", "")
retention_days = hls_config.get("retention_days", 3)
if not hls_root_path:
logger.warning("[WARN] HLS root path not configured, skipping HLS cleanup")
return None
# 检查是否已经在运行
pid = read_hls_cleanup_pid()
if pid and is_process_running(pid):
logger.warning(f"[WARN] HLS cleanup is already running with PID {pid}")
return None
cmd = [
sys.executable,
"hls_cleanup.py",
"--hls-root-path", hls_root_path,
"--retention-days", str(retention_days),
"--interval", "1" # 每小时清理一次
]
logger.info(f"[INFO] Starting HLS cleanup service (retention: {retention_days} days)")
if DOWNLOADER_DEBUG_MODE:
process = subprocess.Popen(cmd)
else:
process = subprocess.Popen(cmd, start_new_session=True)
return process
def start_service(): def start_service():
"""启动所有服务组""" """启动所有服务组"""
config_path = "config.yaml" config_path = "config.yaml"
@@ -325,6 +390,18 @@ def start_service():
logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups") logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups")
logger.info(f"[INFO] Started {downloader_count} HLS downloaders") logger.info(f"[INFO] Started {downloader_count} HLS downloaders")
# 启动HLS清理进程
cleanup_process = None
if hls_config.get("hls_root_path"):
try:
cleanup_process = start_hls_cleanup(hls_config)
if cleanup_process:
time.sleep(0.3)
save_hls_cleanup_pid(cleanup_process.pid)
logger.info(f"[INFO] HLS cleanup service started with PID {cleanup_process.pid}")
except Exception as e:
logger.error(f"[ERROR] Failed to start HLS cleanup service: {e}")
# 根据各自的调试模式决定是否等待子进程 # 根据各自的调试模式决定是否等待子进程
if DEBUG_MODE and processes: if DEBUG_MODE and processes:
logger.info("[DEBUG] Waiting for service processes...") logger.info("[DEBUG] Waiting for service processes...")
@@ -336,6 +413,10 @@ def start_service():
for process, name in downloader_processes: for process, name in downloader_processes:
process.wait() process.wait()
if DOWNLOADER_DEBUG_MODE and cleanup_process:
logger.info("[DEBUG] Waiting for cleanup process...")
cleanup_process.wait()
return started_count > 0 return started_count > 0
@@ -398,6 +479,18 @@ def status_service():
if downloader_pids: if downloader_pids:
logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running") logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running")
# 检查HLS清理进程状态
cleanup_pid = read_hls_cleanup_pid()
if cleanup_pid and is_process_running(cleanup_pid):
logger.info(f"[INFO] HLS cleanup service is running with PID {cleanup_pid}")
else:
logger.info(f"[INFO] HLS cleanup service is not running")
if cleanup_pid:
try:
os.remove(get_hls_cleanup_pid_file_path())
except:
pass
return running_count > 0 return running_count > 0
@@ -511,6 +604,45 @@ def stop_service(force=False):
if downloader_pids: if downloader_pids:
logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders") logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders")
# 停止HLS清理进程
cleanup_pid = read_hls_cleanup_pid()
if cleanup_pid:
if not is_process_running(cleanup_pid):
logger.warning(f"[WARN] HLS cleanup PID {cleanup_pid} not running, cleaning up")
try:
os.remove(get_hls_cleanup_pid_file_path())
except:
pass
else:
try:
if force:
logger.info(f"[INFO] Force killing HLS cleanup (PID {cleanup_pid})")
os.kill(cleanup_pid, signal.SIGKILL)
else:
logger.info(f"[INFO] Stopping HLS cleanup (PID {cleanup_pid})")
os.kill(cleanup_pid, signal.SIGTERM)
# 等待进程结束
for i in range(10):
if not is_process_running(cleanup_pid):
break
time.sleep(1)
if is_process_running(cleanup_pid):
logger.warning("[WARN] Force killing HLS cleanup")
os.kill(cleanup_pid, signal.SIGKILL)
time.sleep(1)
# 清理PID文件
try:
os.remove(get_hls_cleanup_pid_file_path())
except:
pass
logger.info("[INFO] HLS cleanup stopped")
except Exception as e:
logger.error(f"[ERROR] Failed to stop HLS cleanup: {e}")
return True return True