# hls_downloader.py # HLS分片TS下载管理进程 # 功能:管理ffmpeg下载HLS直播流,支持异常重试、定时轮换、文件清理 import os import sys import json import argparse import signal import subprocess import shutil import time from datetime import datetime, timedelta from typing import Optional from utils.logger import get_logger logger = get_logger(__name__) ffmpeg_process: Optional[subprocess.Popen] = None running = True should_restart = False # 标记是否需要重新启动ffmpeg(凌晨3点触发) def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser(description="HLS Downloader Manager") parser.add_argument("--index-code", required=True, help="Camera index code") parser.add_argument("--camera-name", required=True, help="Camera name") parser.add_argument("--camera-id", required=True, type=int, help="Camera ID") parser.add_argument("--hls-root-path", required=True, help="HLS root path") parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)") parser.add_argument("--retention-days", required=True, type=int, help="File retention days") parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds") return parser.parse_args() # def init_logger(index_code: str, pid: int): # """初始化日志记录器""" # global logger # log_dir = "log" # if not os.path.exists(log_dir): # os.makedirs(log_dir) # log_file = os.path.join(log_dir, f"hls_downloader_{index_code}_{pid}.log") # # # 创建独立的logger实例 # from logging import Logger, FileHandler, Formatter, INFO # logger = Logger(f"hls_downloader_{index_code}") # handler = FileHandler(log_file, encoding='utf-8') # handler.setFormatter(Formatter('%(asctime)s - %(levelname)s - %(message)s')) # logger.addHandler(handler) # logger.setLevel(INFO) # # logger.info(f"Logger initialized, log file: {log_file}") def get_hls_url(index_code: str) -> Optional[str]: """获取HLS播放地址""" try: import test_cam result = test_cam.get_camera_hls_url(index_code) if result.get("code") == "0" and result.get("data", {}).get("url"): return result["data"]["url"] else: logger.error(f"Failed to get HLS URL: {result}") return None except Exception as e: logger.error(f"Exception while getting HLS URL: {e}") return None def create_session_folder(hls_root_path: str, index_code: str) -> str: """创建下载会话文件夹""" session_name = datetime.now().strftime("%Y%m%d_%H%M%S") index_folder = os.path.join(hls_root_path, index_code) session_folder = os.path.join(index_folder, session_name) os.makedirs(session_folder, exist_ok=True) logger.info(f"Created session folder: {session_folder}") return session_folder def start_ffmpeg(m3u8_url: str, session_folder: str) -> Optional[subprocess.Popen]: """启动ffmpeg下载进程""" global ffmpeg_process try: segment_pattern = os.path.join(session_folder, "segment_%09d.ts") playlist_path = os.path.join(session_folder, "playlist.m3u8") cmd = [ "ffmpeg", "-i", m3u8_url, "-c", "copy", "-hls_segment_filename", segment_pattern, playlist_path ] logger.info(f"Starting ffmpeg: {' '.join(cmd)}") # 启动ffmpeg,丢弃输出 ffmpeg_process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logger.info(f"FFmpeg started with PID: {ffmpeg_process.pid}") return ffmpeg_process except Exception as e: logger.error(f"Failed to start ffmpeg: {e}") return None def stop_ffmpeg(): """停止ffmpeg进程""" global ffmpeg_process if ffmpeg_process and ffmpeg_process.poll() is None: logger.info(f"Stopping ffmpeg process (PID: {ffmpeg_process.pid})") try: ffmpeg_process.terminate() # 等待进程结束 try: ffmpeg_process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning("FFmpeg did not terminate gracefully, killing...") ffmpeg_process.kill() ffmpeg_process.wait() logger.info("FFmpeg process stopped") except Exception as e: logger.error(f"Error stopping ffmpeg: {e}") ffmpeg_process = None def cleanup_old_folders(hls_root_path: str, index_code: str, retention_days: int): """清理过期的下载文件夹""" try: index_folder = os.path.join(hls_root_path, index_code) if not os.path.exists(index_folder): return cutoff_date = datetime.now() - timedelta(days=retention_days) cutoff_str = cutoff_date.strftime("%Y%m%d") deleted_count = 0 for folder_name in os.listdir(index_folder): folder_path = os.path.join(index_folder, folder_name) if os.path.isdir(folder_path): # 从文件夹名称解析日期 (yyyyMMdd_HHmmss) try: date_str = folder_name.split("_")[0] if date_str < cutoff_str: shutil.rmtree(folder_path) logger.info(f"Deleted old folder: {folder_path}") deleted_count += 1 except Exception as e: logger.warning(f"Failed to parse folder date {folder_name}: {e}") if deleted_count > 0: logger.info(f"Cleaned up {deleted_count} old folders") except Exception as e: logger.error(f"Error cleaning up old folders: {e}") def get_next_rotate_time(rotate_hour: int) -> datetime: """计算下次轮换时间""" now = datetime.now() rotate_time = now.replace(hour=rotate_hour, minute=0, second=0, microsecond=0) if now >= rotate_time: # 已经过了今天的轮换时间,计算明天的 rotate_time += timedelta(days=1) return rotate_time def download_cycle(hls_root_path: str, index_code: str, retry_interval: int) -> bool: """ 执行一次下载流程 返回:True表示成功启动ffmpeg,False表示失败 """ # 1. 获取HLS地址 m3u8_url = get_hls_url(index_code) if not m3u8_url: logger.error("Failed to get HLS URL, will retry later") return False # 2. 创建会话文件夹 try: session_folder = create_session_folder(hls_root_path, index_code) except Exception as e: logger.error(f"Failed to create session folder: {e}") return False # 3. 启动ffmpeg proc = start_ffmpeg(m3u8_url, session_folder) if not proc: logger.error("Failed to start ffmpeg") return False return True def signal_handler(signum, frame): """信号处理器""" global running logger.info(f"Received signal {signum}, shutting down...") running = False def main(): global running, should_restart, ffmpeg_process args = parse_args() # # 初始化日志 # init_logger(args.index_code, os.getpid()) # 注册信号处理器 signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})") logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, " f"retention_days={args.retention_days}, retry_interval={args.retry_interval}") # 计算下次轮换时间 next_rotate_time = get_next_rotate_time(args.rotate_hour) logger.info(f"Next rotation scheduled at: {next_rotate_time}") # 主循环 while running: # 如果没有运行中的ffmpeg,尝试启动 if ffmpeg_process is None or ffmpeg_process.poll() is not None: if ffmpeg_process is not None and ffmpeg_process.poll() is not None: logger.warning(f"FFmpeg exited unexpectedly with code: {ffmpeg_process.returncode}") # 尝试启动下载 success = download_cycle( args.hls_root_path, args.index_code, args.retry_interval ) if not success: logger.info(f"Download failed, waiting {args.retry_interval} seconds before retry...") time.sleep(args.retry_interval) continue # 检查是否到达轮换时间 now = datetime.now() if now >= next_rotate_time: logger.info("Daily rotation triggered") # 停止当前ffmpeg stop_ffmpeg() # 清理旧文件 cleanup_old_folders(args.hls_root_path, args.index_code, args.retention_days) # 计算下次轮换时间 next_rotate_time = get_next_rotate_time(args.rotate_hour) logger.info(f"Next rotation scheduled at: {next_rotate_time}") # 继续循环,会重新启动ffmpeg time.sleep(1) continue # 短暂休眠,避免CPU占用过高 time.sleep(1) # 退出前清理 logger.info("Shutting down HLS Downloader...") stop_ffmpeg() logger.info("HLS Downloader stopped") if __name__ == "__main__": main()