Files
SupervisorAI/hls_downloader.py
2026-02-28 12:12:43 +08:00

284 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hls_downloader.py
# HLS分片TS下载管理进程
# 功能管理ffmpeg下载HLS直播流支持异常重试、定时轮换、文件清理
import os
import sys
import json
import argparse
import signal
import subprocess
import shutil
import time
from datetime import datetime, timedelta
from typing import Optional
from utils.logger import get_logger
logger = get_logger(__name__)
ffmpeg_process: Optional[subprocess.Popen] = None
running = True
should_restart = False # 标记是否需要重新启动ffmpeg凌晨3点触发
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="HLS Downloader Manager")
parser.add_argument("--index-code", required=True, help="Camera index code")
parser.add_argument("--camera-name", required=True, help="Camera name")
parser.add_argument("--camera-id", required=True, type=int, help="Camera ID")
parser.add_argument("--hls-root-path", required=True, help="HLS root path")
parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)")
parser.add_argument("--retention-days", required=True, type=int, help="File retention days")
parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds")
return parser.parse_args()
# def init_logger(index_code: str, pid: int):
# """初始化日志记录器"""
# global logger
# log_dir = "log"
# if not os.path.exists(log_dir):
# os.makedirs(log_dir)
# log_file = os.path.join(log_dir, f"hls_downloader_{index_code}_{pid}.log")
#
# # 创建独立的logger实例
# from logging import Logger, FileHandler, Formatter, INFO
# logger = Logger(f"hls_downloader_{index_code}")
# handler = FileHandler(log_file, encoding='utf-8')
# handler.setFormatter(Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# logger.addHandler(handler)
# logger.setLevel(INFO)
#
# logger.info(f"Logger initialized, log file: {log_file}")
def get_hls_url(index_code: str) -> Optional[str]:
"""获取HLS播放地址"""
try:
import test_cam
result = test_cam.get_camera_hls_url(index_code)
if result.get("code") == "0" and result.get("data", {}).get("url"):
return result["data"]["url"]
else:
logger.error(f"Failed to get HLS URL: {result}")
return None
except Exception as e:
logger.error(f"Exception while getting HLS URL: {e}")
return None
def create_session_folder(hls_root_path: str, index_code: str) -> str:
"""创建下载会话文件夹"""
session_name = datetime.now().strftime("%Y%m%d_%H%M%S")
index_folder = os.path.join(hls_root_path, index_code)
session_folder = os.path.join(index_folder, session_name)
os.makedirs(session_folder, exist_ok=True)
logger.info(f"Created session folder: {session_folder}")
return session_folder
def start_ffmpeg(m3u8_url: str, session_folder: str) -> Optional[subprocess.Popen]:
"""启动ffmpeg下载进程"""
global ffmpeg_process
try:
segment_pattern = os.path.join(session_folder, "segment_%09d.ts")
playlist_path = os.path.join(session_folder, "playlist.m3u8")
cmd = [
"ffmpeg",
"-i", m3u8_url,
"-c", "copy",
"-hls_segment_filename", segment_pattern,
playlist_path
]
logger.info(f"Starting ffmpeg: {' '.join(cmd)}")
# 启动ffmpeg丢弃输出
ffmpeg_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
logger.info(f"FFmpeg started with PID: {ffmpeg_process.pid}")
return ffmpeg_process
except Exception as e:
logger.error(f"Failed to start ffmpeg: {e}")
return None
def stop_ffmpeg():
"""停止ffmpeg进程"""
global ffmpeg_process
if ffmpeg_process and ffmpeg_process.poll() is None:
logger.info(f"Stopping ffmpeg process (PID: {ffmpeg_process.pid})")
try:
ffmpeg_process.terminate()
# 等待进程结束
try:
ffmpeg_process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("FFmpeg did not terminate gracefully, killing...")
ffmpeg_process.kill()
ffmpeg_process.wait()
logger.info("FFmpeg process stopped")
except Exception as e:
logger.error(f"Error stopping ffmpeg: {e}")
ffmpeg_process = None
def cleanup_old_folders(hls_root_path: str, index_code: str, retention_days: int):
"""清理过期的下载文件夹"""
try:
index_folder = os.path.join(hls_root_path, index_code)
if not os.path.exists(index_folder):
return
cutoff_date = datetime.now() - timedelta(days=retention_days)
cutoff_str = cutoff_date.strftime("%Y%m%d")
deleted_count = 0
for folder_name in os.listdir(index_folder):
folder_path = os.path.join(index_folder, folder_name)
if os.path.isdir(folder_path):
# 从文件夹名称解析日期 (yyyyMMdd_HHmmss)
try:
date_str = folder_name.split("_")[0]
if date_str < cutoff_str:
shutil.rmtree(folder_path)
logger.info(f"Deleted old folder: {folder_path}")
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to parse folder date {folder_name}: {e}")
if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old folders")
except Exception as e:
logger.error(f"Error cleaning up old folders: {e}")
def get_next_rotate_time(rotate_hour: int) -> datetime:
"""计算下次轮换时间"""
now = datetime.now()
rotate_time = now.replace(hour=rotate_hour, minute=0, second=0, microsecond=0)
if now >= rotate_time:
# 已经过了今天的轮换时间,计算明天的
rotate_time += timedelta(days=1)
return rotate_time
def download_cycle(hls_root_path: str, index_code: str, retry_interval: int) -> bool:
"""
执行一次下载流程
返回True表示成功启动ffmpegFalse表示失败
"""
# 1. 获取HLS地址
m3u8_url = get_hls_url(index_code)
if not m3u8_url:
logger.error("Failed to get HLS URL, will retry later")
return False
# 2. 创建会话文件夹
try:
session_folder = create_session_folder(hls_root_path, index_code)
except Exception as e:
logger.error(f"Failed to create session folder: {e}")
return False
# 3. 启动ffmpeg
proc = start_ffmpeg(m3u8_url, session_folder)
if not proc:
logger.error("Failed to start ffmpeg")
return False
return True
def signal_handler(signum, frame):
"""信号处理器"""
global running
logger.info(f"Received signal {signum}, shutting down...")
running = False
def main():
global running, should_restart, ffmpeg_process
args = parse_args()
# # 初始化日志
# init_logger(args.index_code, os.getpid())
# 注册信号处理器
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})")
logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, "
f"retention_days={args.retention_days}, retry_interval={args.retry_interval}")
# 计算下次轮换时间
next_rotate_time = get_next_rotate_time(args.rotate_hour)
logger.info(f"Next rotation scheduled at: {next_rotate_time}")
# 主循环
while running:
# 如果没有运行中的ffmpeg尝试启动
if ffmpeg_process is None or ffmpeg_process.poll() is not None:
if ffmpeg_process is not None and ffmpeg_process.poll() is not None:
logger.warning(f"FFmpeg exited unexpectedly with code: {ffmpeg_process.returncode}")
# 尝试启动下载
success = download_cycle(
args.hls_root_path,
args.index_code,
args.retry_interval
)
if not success:
logger.info(f"Download failed, waiting {args.retry_interval} seconds before retry...")
time.sleep(args.retry_interval)
continue
# 检查是否到达轮换时间
now = datetime.now()
if now >= next_rotate_time:
logger.info("Daily rotation triggered")
# 停止当前ffmpeg
stop_ffmpeg()
# 清理旧文件
cleanup_old_folders(args.hls_root_path, args.index_code, args.retention_days)
# 计算下次轮换时间
next_rotate_time = get_next_rotate_time(args.rotate_hour)
logger.info(f"Next rotation scheduled at: {next_rotate_time}")
# 继续循环会重新启动ffmpeg
time.sleep(1)
continue
# 短暂休眠避免CPU占用过高
time.sleep(1)
# 退出前清理
logger.info("Shutting down HLS Downloader...")
stop_ffmpeg()
logger.info("HLS Downloader stopped")
if __name__ == "__main__":
main()