From 1cbe95c91ba0518b940bed1b65647b3bcb0b695e Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Sat, 28 Feb 2026 12:12:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ehls=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=EF=BC=8C=E5=BE=85=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yaml | 3 + hls_downloader.py | 445 +++++++++++++++++++++++++++------------------- main_start.py | 227 +++++++++++++++++++++-- 3 files changed, 483 insertions(+), 192 deletions(-) diff --git a/config.yaml b/config.yaml index b375589..18dc62b 100644 --- a/config.yaml +++ b/config.yaml @@ -35,6 +35,9 @@ database: debug_mode: true alert_push_url: "http://123.57.151.210:10000/picenter/websocket/test/process" hls_root_path: "D:/ProjectDoc/Police/data/hls" +hls_downloader_daily_rotate_hour: 3 # 凌晨轮换时间 +hls_downloader_retention_days: 3 # 文件保留天数 +hls_downloader_retry_interval_seconds: 10 # 重试等待秒数 service_groups: - name: "kadian_group" # 服务组名称 diff --git a/hls_downloader.py b/hls_downloader.py index 473b2ed..9da3f6d 100644 --- a/hls_downloader.py +++ b/hls_downloader.py @@ -1,196 +1,283 @@ -import requests -import m3u8 -import time +# hls_downloader.py +# HLS分片TS下载管理进程 +# 功能:管理ffmpeg下载HLS直播流,支持异常重试、定时轮换、文件清理 + import os -from urllib.parse import urljoin -from datetime import datetime +import sys +import json +import argparse +import signal +import subprocess +import shutil +import time +from datetime import datetime, timedelta +from typing import Optional + +from utils.logger import get_logger +logger = get_logger(__name__) + +ffmpeg_process: Optional[subprocess.Popen] = None +running = True +should_restart = False # 标记是否需要重新启动ffmpeg(凌晨3点触发) -class RawHLSDownloader: - """直接下载HLS分片,保留原始时间戳""" +def parse_args(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description="HLS Downloader Manager") + parser.add_argument("--index-code", required=True, help="Camera index code") + parser.add_argument("--camera-name", required=True, help="Camera name") + parser.add_argument("--camera-id", required=True, type=int, help="Camera ID") + parser.add_argument("--hls-root-path", required=True, help="HLS root path") + parser.add_argument("--rotate-hour", required=True, type=int, help="Daily rotate hour (0-23)") + parser.add_argument("--retention-days", required=True, type=int, help="File retention days") + parser.add_argument("--retry-interval", required=True, type=int, help="Retry interval seconds") + return parser.parse_args() - def __init__(self, m3u8_url, output_dir="segments"): - self.m3u8_url = m3u8_url - self.output_dir = output_dir - self.session = requests.Session() - self.session.headers.update({ - 'User-Agent': 'Mozilla/5.0' - }) - os.makedirs(output_dir, exist_ok=True) +# def init_logger(index_code: str, pid: int): +# """初始化日志记录器""" +# global logger +# log_dir = "log" +# if not os.path.exists(log_dir): +# os.makedirs(log_dir) +# log_file = os.path.join(log_dir, f"hls_downloader_{index_code}_{pid}.log") +# +# # 创建独立的logger实例 +# from logging import Logger, FileHandler, Formatter, INFO +# logger = Logger(f"hls_downloader_{index_code}") +# handler = FileHandler(log_file, encoding='utf-8') +# handler.setFormatter(Formatter('%(asctime)s - %(levelname)s - %(message)s')) +# logger.addHandler(handler) +# logger.setLevel(INFO) +# +# logger.info(f"Logger initialized, log file: {log_file}") - def download_playlist(self): - """下载并解析m3u8播放列表""" - response = self.session.get(self.m3u8_url) - response.raise_for_status() - # 解析播放列表 - playlist = m3u8.loads(response.text, uri=self.m3u8_url) +def get_hls_url(index_code: str) -> Optional[str]: + """获取HLS播放地址""" + try: + import test_cam + result = test_cam.get_camera_hls_url(index_code) + if result.get("code") == "0" and result.get("data", {}).get("url"): + return result["data"]["url"] + else: + logger.error(f"Failed to get HLS URL: {result}") + return None + except Exception as e: + logger.error(f"Exception while getting HLS URL: {e}") + return None - # 如果是主播放列表(多码率),选择第一个 - if playlist.is_variant: - print(f"发现多码率流,选择: {playlist.playlists[0].stream_info}") - playlist_url = playlist.playlists[0].absolute_uri - response = self.session.get(playlist_url) - response.raise_for_status() - playlist = m3u8.loads(response.text, uri=playlist_url) - return playlist +def create_session_folder(hls_root_path: str, index_code: str) -> str: + """创建下载会话文件夹""" + session_name = datetime.now().strftime("%Y%m%d_%H%M%S") + index_folder = os.path.join(hls_root_path, index_code) + session_folder = os.path.join(index_folder, session_name) + + os.makedirs(session_folder, exist_ok=True) + logger.info(f"Created session folder: {session_folder}") + + return session_folder - def download_segment_raw(self, segment_url, segment_filename): - """直接下载TS分片,不进行任何处理""" - print(f"下载: {segment_url}") - response = self.session.get(segment_url, stream=True) - response.raise_for_status() +def start_ffmpeg(m3u8_url: str, session_folder: str) -> Optional[subprocess.Popen]: + """启动ffmpeg下载进程""" + global ffmpeg_process + + try: + segment_pattern = os.path.join(session_folder, "segment_%09d.ts") + playlist_path = os.path.join(session_folder, "playlist.m3u8") + + cmd = [ + "ffmpeg", + "-i", m3u8_url, + "-c", "copy", + "-hls_segment_filename", segment_pattern, + playlist_path + ] + + logger.info(f"Starting ffmpeg: {' '.join(cmd)}") + + # 启动ffmpeg,丢弃输出 + ffmpeg_process = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + + logger.info(f"FFmpeg started with PID: {ffmpeg_process.pid}") + return ffmpeg_process + + except Exception as e: + logger.error(f"Failed to start ffmpeg: {e}") + return None - # 直接保存原始字节 - filepath = os.path.join(self.output_dir, segment_filename) - with open(filepath, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - # 获取下载时间戳 - download_time = datetime.now() - - # 获取文件大小 - file_size = os.path.getsize(filepath) - - print(f" 保存到: {segment_filename} ({file_size} 字节)") - - return { - 'filename': segment_filename, - 'url': segment_url, - 'size': file_size, - 'download_time': download_time, - 'local_path': filepath - } - - def analyze_raw_ts(self, ts_file): - """分析原始TS文件的时间戳(不通过FFmpeg)""" - import struct - - filepath = os.path.join(self.output_dir, ts_file) - - with open(filepath, 'rb') as f: - # 读取TS包(每个188字节) - packet_size = 188 - packets = [] - - while True: - packet = f.read(packet_size) - if len(packet) < packet_size: - break - - # 解析TS包头 - sync_byte = packet[0] - if sync_byte != 0x47: # TS同步字节 - print(f"警告: 无效的TS包同步字节: {sync_byte:02x}") - continue - - # 解析PID - pid = ((packet[1] & 0x1F) << 8) | packet[2] - - # 检查适配字段是否存在 - adaptation_field_control = (packet[3] >> 4) & 0x03 - - # 如果有适配字段,可能包含PCR - if adaptation_field_control in [2, 3]: - adaptation_field_length = packet[4] - if adaptation_field_length > 0: - pcr_flag = (packet[5] >> 4) & 0x01 - if pcr_flag and adaptation_field_length >= 6: - # 提取PCR - pcr_bytes = packet[6:12] - pcr_base = ( - (pcr_bytes[0] << 25) | - (pcr_bytes[1] << 17) | - (pcr_bytes[2] << 9) | - (pcr_bytes[3] << 1) | - (pcr_bytes[4] >> 7) - ) - pcr_extension = ((pcr_bytes[4] & 0x01) << 8) | pcr_bytes[5] - pcr_value = pcr_base * 300 + pcr_extension - pcr_ms = pcr_value / 27000.0 # 转换为毫秒 - - packets.append({ - 'pid': pid, - 'has_pcr': True, - 'pcr_ms': pcr_ms - }) - continue - - packets.append({ - 'pid': pid, - 'has_pcr': False - }) - - # 统计PCR信息 - pcr_packets = [p for p in packets if p['has_pcr']] - - return { - 'total_packets': len(packets), - 'pcr_packets': len(pcr_packets), - 'pcr_values': [p['pcr_ms'] for p in pcr_packets] if pcr_packets else [], - 'file_size': os.path.getsize(filepath) - } - - def monitor_and_download(self, max_segments=None): - """监控并下载新的分片""" - downloaded_segments = set() - segment_counter = 0 - - while True: +def stop_ffmpeg(): + """停止ffmpeg进程""" + global ffmpeg_process + + if ffmpeg_process and ffmpeg_process.poll() is None: + logger.info(f"Stopping ffmpeg process (PID: {ffmpeg_process.pid})") + try: + ffmpeg_process.terminate() + # 等待进程结束 try: - # 获取最新播放列表 - playlist = self.download_playlist() - - # 检查播放列表中的分片 - for segment in playlist.segments: - segment_url = segment.absolute_uri - segment_filename = os.path.basename(segment_url) - - # 如果还没下载过 - if segment_filename not in downloaded_segments: - # 下载原始分片 - result = self.download_segment_raw(segment_url, segment_filename) - - # 分析原始TS文件 - analysis = self.analyze_raw_ts(segment_filename) - - print(f" 分析结果: {analysis['total_packets']}包, " - f"PCR包: {analysis['pcr_packets']}") - - if analysis['pcr_values']: - print(f" PCR范围: {min(analysis['pcr_values']):.1f}ms - " - f"{max(analysis['pcr_values']):.1f}ms") - - downloaded_segments.add(segment_filename) - segment_counter += 1 - - # 检查是否达到最大数量 - if max_segments and segment_counter >= max_segments: - print(f"达到最大分片数: {max_segments}") - return - - # 等待下一轮 - sleep_time = playlist.target_duration or 2 - print(f"等待 {sleep_time} 秒...") - time.sleep(sleep_time) - - except KeyboardInterrupt: - print("用户中断") - break - except Exception as e: - print(f"错误: {e}") - time.sleep(5) + ffmpeg_process.wait(timeout=5) + except subprocess.TimeoutExpired: + logger.warning("FFmpeg did not terminate gracefully, killing...") + ffmpeg_process.kill() + ffmpeg_process.wait() + logger.info("FFmpeg process stopped") + except Exception as e: + logger.error(f"Error stopping ffmpeg: {e}") + + ffmpeg_process = None -# 使用示例 -downloader = RawHLSDownloader( - m3u8_url="http://192.168.110.139:8080/stream.m3u8", - output_dir="raw_segments" -) +def cleanup_old_folders(hls_root_path: str, index_code: str, retention_days: int): + """清理过期的下载文件夹""" + try: + index_folder = os.path.join(hls_root_path, index_code) + if not os.path.exists(index_folder): + return + + cutoff_date = datetime.now() - timedelta(days=retention_days) + cutoff_str = cutoff_date.strftime("%Y%m%d") + + deleted_count = 0 + for folder_name in os.listdir(index_folder): + folder_path = os.path.join(index_folder, folder_name) + if os.path.isdir(folder_path): + # 从文件夹名称解析日期 (yyyyMMdd_HHmmss) + try: + date_str = folder_name.split("_")[0] + if date_str < cutoff_str: + shutil.rmtree(folder_path) + logger.info(f"Deleted old folder: {folder_path}") + deleted_count += 1 + except Exception as e: + logger.warning(f"Failed to parse folder date {folder_name}: {e}") + + if deleted_count > 0: + logger.info(f"Cleaned up {deleted_count} old folders") + + except Exception as e: + logger.error(f"Error cleaning up old folders: {e}") -# 下载5个分片进行测试 -downloader.monitor_and_download(max_segments=5) \ No newline at end of file + +def get_next_rotate_time(rotate_hour: int) -> datetime: + """计算下次轮换时间""" + now = datetime.now() + rotate_time = now.replace(hour=rotate_hour, minute=0, second=0, microsecond=0) + + if now >= rotate_time: + # 已经过了今天的轮换时间,计算明天的 + rotate_time += timedelta(days=1) + + return rotate_time + + +def download_cycle(hls_root_path: str, index_code: str, retry_interval: int) -> bool: + """ + 执行一次下载流程 + 返回:True表示成功启动ffmpeg,False表示失败 + """ + # 1. 获取HLS地址 + m3u8_url = get_hls_url(index_code) + if not m3u8_url: + logger.error("Failed to get HLS URL, will retry later") + return False + + # 2. 创建会话文件夹 + try: + session_folder = create_session_folder(hls_root_path, index_code) + except Exception as e: + logger.error(f"Failed to create session folder: {e}") + return False + + # 3. 启动ffmpeg + proc = start_ffmpeg(m3u8_url, session_folder) + if not proc: + logger.error("Failed to start ffmpeg") + return False + + return True + + +def signal_handler(signum, frame): + """信号处理器""" + global running + logger.info(f"Received signal {signum}, shutting down...") + running = False + + +def main(): + global running, should_restart, ffmpeg_process + + args = parse_args() + + # # 初始化日志 + # init_logger(args.index_code, os.getpid()) + + # 注册信号处理器 + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + logger.info(f"HLS Downloader starting for camera: {args.camera_name} (index: {args.index_code})") + logger.info(f"Config: hls_root={args.hls_root_path}, rotate_hour={args.rotate_hour}, " + f"retention_days={args.retention_days}, retry_interval={args.retry_interval}") + + # 计算下次轮换时间 + next_rotate_time = get_next_rotate_time(args.rotate_hour) + logger.info(f"Next rotation scheduled at: {next_rotate_time}") + + # 主循环 + while running: + # 如果没有运行中的ffmpeg,尝试启动 + if ffmpeg_process is None or ffmpeg_process.poll() is not None: + if ffmpeg_process is not None and ffmpeg_process.poll() is not None: + logger.warning(f"FFmpeg exited unexpectedly with code: {ffmpeg_process.returncode}") + + # 尝试启动下载 + success = download_cycle( + args.hls_root_path, + args.index_code, + args.retry_interval + ) + + if not success: + logger.info(f"Download failed, waiting {args.retry_interval} seconds before retry...") + time.sleep(args.retry_interval) + continue + + # 检查是否到达轮换时间 + now = datetime.now() + if now >= next_rotate_time: + logger.info("Daily rotation triggered") + + # 停止当前ffmpeg + stop_ffmpeg() + + # 清理旧文件 + cleanup_old_folders(args.hls_root_path, args.index_code, args.retention_days) + + # 计算下次轮换时间 + next_rotate_time = get_next_rotate_time(args.rotate_hour) + logger.info(f"Next rotation scheduled at: {next_rotate_time}") + + # 继续循环,会重新启动ffmpeg + time.sleep(1) + continue + + # 短暂休眠,避免CPU占用过高 + time.sleep(1) + + # 退出前清理 + logger.info("Shutting down HLS Downloader...") + stop_ffmpeg() + logger.info("HLS Downloader stopped") + + +if __name__ == "__main__": + main() diff --git a/main_start.py b/main_start.py index a56a4e2..f1f0f15 100644 --- a/main_start.py +++ b/main_start.py @@ -11,7 +11,8 @@ import os import signal import argparse import time -from typing import List, Dict +import glob +from typing import List, Dict, Optional from common.camera_config import CameraConfig from utils.logger import get_logger @@ -21,6 +22,9 @@ logger = get_logger(__name__) # PID 文件目录 PID_DIR = "pids" +# HLS下载器PID文件前缀 +HLS_DOWNLOADER_PID_PREFIX = "hls_downloader_" + def load_debug_mode(config_path: str = "config.yaml") -> bool: """从配置文件读取调试模式""" @@ -43,6 +47,27 @@ def load_service_groups_from_yaml(config_path: str = "config.yaml") -> List[dict return cfg.get("service_groups", []) +def load_hls_config(config_path: str = "config.yaml") -> dict: + """从配置文件加载HLS下载器配置""" + try: + with open(config_path, "r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) + return { + "hls_root_path": cfg.get("hls_root_path", ""), + "daily_rotate_hour": cfg.get("hls_downloader_daily_rotate_hour", 3), + "retention_days": cfg.get("hls_downloader_retention_days", 3), + "retry_interval": cfg.get("hls_downloader_retry_interval_seconds", 10) + } + except Exception as e: + logger.error(f"[ERROR] Failed to load HLS config: {e}") + return { + "hls_root_path": "", + "daily_rotate_hour": 3, + "retention_days": 3, + "retry_interval": 10 + } + + def cameras_to_base64_json(cameras: List[dict]) -> str: """将摄像头配置转换为 base64 编码的 JSON 字符串""" json_str = json.dumps(cameras, ensure_ascii=False) @@ -54,6 +79,11 @@ def get_pid_file_path(group_name: str) -> str: return os.path.join(PID_DIR, f"{group_name}.pid") +def get_hls_downloader_pid_file_path(index_code: str) -> str: + """获取HLS下载器的 PID 文件路径""" + return os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}{index_code}.pid") + + def ensure_pid_dir(): """确保 PID 目录存在""" if not os.path.exists(PID_DIR): @@ -72,6 +102,18 @@ def save_pid(pid: int, group_name: str): logger.error(f"[ERROR] Failed to save PID file: {e}") +def save_hls_downloader_pid(pid: int, index_code: str): + """保存HLS下载器进程ID到文件""" + ensure_pid_dir() + pid_file = get_hls_downloader_pid_file_path(index_code) + try: + with open(pid_file, "w") as f: + f.write(str(pid)) + logger.info(f"[INFO] Saved HLS downloader PID {pid} to {pid_file}") + except Exception as e: + logger.error(f"[ERROR] Failed to save HLS downloader PID file: {e}") + + def read_pid(group_name: str): """从PID文件读取进程ID""" pid_file = get_pid_file_path(group_name) @@ -85,6 +127,19 @@ def read_pid(group_name: str): return None +def read_hls_downloader_pid(index_code: str): + """从PID文件读取HLS下载器进程ID""" + pid_file = get_hls_downloader_pid_file_path(index_code) + try: + with open(pid_file, "r") as f: + return int(f.read().strip()) + except FileNotFoundError: + return None + except Exception as e: + logger.error(f"[ERROR] Failed to read HLS downloader PID file: {e}") + return None + + def is_process_running(pid: int): """检查进程是否在运行""" try: @@ -102,7 +157,7 @@ def get_script_path(video_source_type: str) -> str: return "rtsp_service_ws_kadian.py" -def start_service_group(group: dict): +def start_service_group(group: dict, hls_config: dict = None): """启动服务子进程(后台运行)""" cameras = group.get("cameras", []) ws_host = group.get("ws_host", "0.0.0.0") @@ -135,13 +190,53 @@ def start_service_group(group: dict): return process, group_name +def start_hls_downloader(camera: dict, hls_config: dict) -> Optional[subprocess.Popen]: + """启动HLS下载器进程""" + index_code = camera.get("index", "") + camera_name = camera.get("name", "") + camera_id = camera.get("id", 0) + + if not index_code: + logger.warning(f"[WARN] Camera has no index_code, skipping HLS downloader") + return None + + # 检查是否已经在运行 + pid = read_hls_downloader_pid(index_code) + if pid and is_process_running(pid): + logger.warning(f"[WARN] HLS downloader for '{index_code}' is already running with PID {pid}") + return None + + cmd = [ + sys.executable, + "hls_downloader.py", + "--index-code", index_code, + "--camera-name", camera_name, + "--camera-id", str(camera_id), + "--hls-root-path", hls_config.get("hls_root_path", ""), + "--rotate-hour", str(hls_config.get("daily_rotate_hour", 3)), + "--retention-days", str(hls_config.get("retention_days", 3)), + "--retry-interval", str(hls_config.get("retry_interval", 10)) + ] + + logger.info(f"[INFO] Starting HLS downloader for camera '{camera_name}' (index: {index_code})") + + if DEBUG_MODE: + process = subprocess.Popen(cmd) + else: + process = subprocess.Popen(cmd, start_new_session=True) + + return process + + def start_service(): """启动所有服务组""" config_path = "config.yaml" # 1. 读取配置 service_groups = load_service_groups_from_yaml(config_path) + hls_config = load_hls_config(config_path) logger.info(f"[INFO] Loaded {len(service_groups)} service groups from {config_path}") + logger.info(f"[INFO] HLS config: root_path={hls_config.get('hls_root_path')}, rotate_hour={hls_config.get('daily_rotate_hour')}") if not service_groups: logger.error("[ERROR] No service groups found in config, exiting...") @@ -149,27 +244,49 @@ def start_service(): # 2. 启动每个服务组 started_count = 0 + downloader_count = 0 processes = [] # 记录所有子进程,用于调试模式下等待 + downloader_processes = [] # 记录下载器进程 + for group in service_groups: group_name = group.get("name", "default") + video_source_type = group.get("video_source_type", "rtsp") + cameras = group.get("cameras", []) # 检查是否已经在运行 pid = read_pid(group_name) if pid and is_process_running(pid): logger.warning(f"[WARN] Service group '{group_name}' is already running with PID {pid}") - continue + else: + try: + process, name = start_service_group(group, hls_config) + time.sleep(0.5) + save_pid(process.pid, name) + logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}") + processes.append((process, name)) + started_count += 1 + except Exception as e: + logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}") - try: - process, name = start_service_group(group) - time.sleep(0.5) - save_pid(process.pid, name) - logger.info(f"[INFO] Service group '{name}' started with PID {process.pid}") - processes.append((process, name)) - started_count += 1 - except Exception as e: - logger.error(f"[ERROR] Failed to start service group '{group_name}': {e}") + # 如果是HLS类型,为每个摄像头启动下载进程 + if video_source_type == "hls" and hls_config.get("hls_root_path"): + for camera in cameras: + index_code = camera.get("index", "") + if not index_code: + continue + try: + downloader_proc = start_hls_downloader(camera, hls_config) + if downloader_proc: + time.sleep(0.3) + save_hls_downloader_pid(downloader_proc.pid, index_code) + logger.info(f"[INFO] HLS downloader for '{index_code}' started with PID {downloader_proc.pid}") + downloader_processes.append((downloader_proc, index_code)) + downloader_count += 1 + except Exception as e: + logger.error(f"[ERROR] Failed to start HLS downloader for '{index_code}': {e}") logger.info(f"[INFO] Started {started_count}/{len(service_groups)} service groups") + logger.info(f"[INFO] Started {downloader_count} HLS downloaders") # DEBUG_MODE=True 时,主进程等待所有子进程 if DEBUG_MODE and processes: @@ -180,6 +297,18 @@ def start_service(): return started_count > 0 +def get_all_hls_downloader_pids() -> List[tuple]: + """获取所有HLS下载器的PID文件和index_code""" + result = [] + pid_pattern = os.path.join(PID_DIR, f"{HLS_DOWNLOADER_PID_PREFIX}*.pid") + for pid_file in glob.glob(pid_pattern): + # 从文件名提取index_code: hls_downloader_{index_code}.pid + filename = os.path.basename(pid_file) + index_code = filename[len(HLS_DOWNLOADER_PID_PREFIX):-4] # 去掉前缀和.pid后缀 + result.append((index_code, pid_file)) + return result + + def status_service(): """检查所有服务组状态""" config_path = "config.yaml" @@ -207,11 +336,31 @@ def status_service(): pass logger.info(f"[INFO] {running_count}/{len(service_groups)} service groups running") + + # 检查HLS下载器状态 + downloader_pids = get_all_hls_downloader_pids() + downloader_running = 0 + for index_code, pid_file in downloader_pids: + pid = read_hls_downloader_pid(index_code) + if pid and is_process_running(pid): + logger.info(f"[INFO] HLS downloader '{index_code}' is running with PID {pid}") + downloader_running += 1 + else: + logger.info(f"[INFO] HLS downloader '{index_code}' is not running") + if pid: + try: + os.remove(pid_file) + except: + pass + + if downloader_pids: + logger.info(f"[INFO] {downloader_running}/{len(downloader_pids)} HLS downloaders running") + return running_count > 0 def stop_service(force=False): - """停止所有服务组""" + """停止所有服务组和HLS下载器""" config_path = "config.yaml" service_groups = load_service_groups_from_yaml(config_path) @@ -268,6 +417,58 @@ def stop_service(force=False): logger.error(f"[ERROR] Failed to stop service group '{group_name}': {e}") logger.info(f"[INFO] Stopped {stopped_count}/{len(service_groups)} service groups") + + # 停止所有HLS下载器 + downloader_pids = get_all_hls_downloader_pids() + downloader_stopped = 0 + for index_code, pid_file in downloader_pids: + pid = read_hls_downloader_pid(index_code) + + if not pid: + continue + + if not is_process_running(pid): + logger.warning(f"[WARN] HLS downloader '{index_code}' PID {pid} not running, cleaning up") + try: + os.remove(pid_file) + except: + pass + downloader_stopped += 1 + continue + + try: + if force: + logger.info(f"[INFO] Force killing HLS downloader '{index_code}' (PID {pid})") + os.kill(pid, signal.SIGKILL) + else: + logger.info(f"[INFO] Stopping HLS downloader '{index_code}' (PID {pid})") + os.kill(pid, signal.SIGTERM) + + # 等待进程结束 + for i in range(10): + if not is_process_running(pid): + break + time.sleep(1) + + if is_process_running(pid): + logger.warning(f"[WARN] Force killing HLS downloader '{index_code}'") + os.kill(pid, signal.SIGKILL) + time.sleep(1) + + # 清理PID文件 + try: + os.remove(pid_file) + except: + pass + + logger.info(f"[INFO] HLS downloader '{index_code}' stopped") + downloader_stopped += 1 + except Exception as e: + logger.error(f"[ERROR] Failed to stop HLS downloader '{index_code}': {e}") + + if downloader_pids: + logger.info(f"[INFO] Stopped {downloader_stopped}/{len(downloader_pids)} HLS downloaders") + return True