# hls_downloader.py # HLS分片TS下载管理进程 # 功能:管理ffmpeg下载HLS直播流,支持异常重试、定时轮换 import os import sys import json import argparse import signal import subprocess import time from datetime import datetime, timedelta from typing import Optional from utils.logger import get_logger logger = get_logger(__name__) ffmpeg_process: Optional[subprocess.Popen] = None running = True should_restart = False # 标记是否需要重新启动ffmpeg(凌晨3点触发) # ffmpeg 退出追踪状态(用于判断是否切换 stream_type) state = { 'exit_times': [], # 记录 ffmpeg 退出时间戳 'use_fallback_stream': False # 标记下次是否使用 stream_type=1 } def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser(description="HLS Downloader Manager") parser.add_argument("--index-code", required=True, help="Camera index code") parser.add_argument("--camera-name", required=True, help="Camera name") parser.add_argument("--camera-id", required=True, type=int, help="Camera ID") parser.add_argument("--hls-root-path", required=True, help="HLS root path") parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)") parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds") return parser.parse_args() # def init_logger(index_code: str, pid: int): # """初始化日志记录器""" # global logger # log_dir = "log" # if not os.path.exists(log_dir): # os.makedirs(log_dir) # log_file = os.path.join(log_dir, f"hls_downloader_{index_code}_{pid}.log") # # # 创建独立的logger实例 # from logging import Logger, FileHandler, Formatter, INFO # logger = Logger(f"hls_downloader_{index_code}") # handler = FileHandler(log_file, encoding='utf-8') # handler.setFormatter(Formatter('%(asctime)s - %(levelname)s - %(message)s')) # logger.addHandler(handler) # logger.setLevel(INFO) # # logger.info(f"Logger initialized, log file: {log_file}") def get_hls_url(index_code: str) -> Optional[str]: """获取HLS播放地址""" # 确定使用哪个 stream_type stream_type = 0 try: stream_type = 1 if state['use_fallback_stream'] else 0 if state['use_fallback_stream']: state['use_fallback_stream'] = False state['exit_times'].clear() # 使用 fallback 时清空退出记录 logger.info("Using fallback stream_type=1") except Exception as e: logger.error(f"Error determining stream_type: {e}") try: import test_cam result = test_cam.get_camera_hls_url(index_code, stream_type) if result.get("code") == "0" and result.get("data", {}).get("url"): return result["data"]["url"] else: logger.error(f"Failed to get HLS URL: {result}") return None except Exception as e: logger.error(f"Exception while getting HLS URL: {e}") return None def create_session_folder(hls_root_path: str, index_code: str) -> str: """创建下载会话文件夹""" session_name = datetime.now().strftime("%Y%m%d_%H%M%S") index_folder = os.path.join(hls_root_path, index_code) session_folder = os.path.join(index_folder, session_name) os.makedirs(session_folder, exist_ok=True) logger.info(f"Created session folder: {session_folder}") return session_folder def start_ffmpeg(m3u8_url: str, session_folder: str) -> Optional[subprocess.Popen]: """启动ffmpeg下载进程""" global ffmpeg_process try: segment_pattern = os.path.join(session_folder, "segment_%09d.ts") playlist_path = os.path.join(session_folder, "playlist.m3u8") cmd = [ "ffmpeg", "-i", m3u8_url, "-c", "copy", "-hls_segment_filename", segment_pattern, playlist_path ] logger.info(f"Starting ffmpeg: {' '.join(cmd)}") # 启动ffmpeg,丢弃输出 ffmpeg_process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logger.info(f"FFmpeg started with PID: {ffmpeg_process.pid}") return ffmpeg_process except Exception as e: logger.error(f"Failed to start ffmpeg: {e}") return None def stop_ffmpeg(): """停止ffmpeg进程""" global ffmpeg_process if ffmpeg_process and ffmpeg_process.poll() is None: logger.info(f"Stopping ffmpeg process (PID: {ffmpeg_process.pid})") try: ffmpeg_process.terminate() # 等待进程结束 try: ffmpeg_process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning("FFmpeg did not terminate gracefully, killing...") ffmpeg_process.kill() ffmpeg_process.wait() logger.info("FFmpeg process stopped") except Exception as e: logger.error(f"Error stopping ffmpeg: {e}") ffmpeg_process = None def get_next_rotate_time(rotate_hour: int) -> datetime: """计算下次轮换时间""" now = datetime.now() rotate_time = now.replace(hour=rotate_hour, minute=0, second=0, microsecond=0) if now >= rotate_time: # 已经过了今天的轮换时间,计算明天的 rotate_time += timedelta(days=1) return rotate_time def download_cycle(hls_root_path: str, index_code: str, retry_interval: int) -> bool: """ 执行一次下载流程 返回:True表示成功启动ffmpeg,False表示失败 """ # 1. 获取HLS地址 m3u8_url = get_hls_url(index_code) if not m3u8_url: logger.error("Failed to get HLS URL, will retry later") return False # 2. 创建会话文件夹 try: session_folder = create_session_folder(hls_root_path, index_code) except Exception as e: logger.error(f"Failed to create session folder: {e}") return False # 3. 启动ffmpeg proc = start_ffmpeg(m3u8_url, session_folder) if not proc: logger.error("Failed to start ffmpeg") return False return True def signal_handler(signum, frame): """信号处理器""" global running logger.info(f"Received signal {signum}, shutting down...") running = False def main(): global running, should_restart, ffmpeg_process args = parse_args() # # 初始化日志 # init_logger(args.index_code, os.getpid()) # 注册信号处理器 signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})") logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, " f"retry_interval={args.retry_interval}") # 计算下次轮换时间 next_rotate_time = get_next_rotate_time(args.rotate_hour) logger.info(f"Next rotation scheduled at: {next_rotate_time}") # 主循环 while running: # 如果没有运行中的ffmpeg,尝试启动 if ffmpeg_process is None or ffmpeg_process.poll() is not None: if ffmpeg_process is not None and ffmpeg_process.poll() is not None: logger.warning(f"FFmpeg exited unexpectedly with code: {ffmpeg_process.returncode}") # 记录退出时间,检查是否需要切换 stream_type try: state['exit_times'].append(datetime.now()) # 清理超过10分钟的旧记录 state['exit_times'][:] = [t for t in state['exit_times'] if datetime.now() - t < timedelta(minutes=10)] if len(state['exit_times']) > 3: # 10分钟内退出超过3次(即第4次) state['use_fallback_stream'] = True logger.warning("FFmpeg exited 4+ times in 10 minutes, will use stream_type=1 next call") except Exception as e: logger.error(f"Error tracking exit times: {e}") # 尝试启动下载 success = download_cycle( args.hls_root_path, args.index_code, args.retry_interval ) if not success: logger.info(f"Download failed, waiting {args.retry_interval} seconds before retry...") time.sleep(args.retry_interval) continue # 检查是否到达轮换时间 now = datetime.now() if now >= next_rotate_time: logger.info("Daily rotation triggered") # 停止当前ffmpeg stop_ffmpeg() # 计算下次轮换时间 next_rotate_time = get_next_rotate_time(args.rotate_hour) logger.info(f"Next rotation scheduled at: {next_rotate_time}") # 继续循环,会重新启动ffmpeg time.sleep(1) continue # 短暂休眠,避免CPU占用过高 time.sleep(1) # 退出前清理 logger.info("Shutting down HLS Downloader...") stop_ffmpeg() logger.info("HLS Downloader stopped") if __name__ == "__main__": main()