import os import glob from common import constants def get_latest_n_segments(camera_root_dir: str, n: int) -> list: """ 获取最新的n个TS分片 逻辑: 1. 获取index_code文件夹下所有时间戳文件夹 2. 按时间戳名称降序排序(最新的在前) 3. 从最新的时间戳文件夹开始获取分片 4. 如果分片数不足n,继续从上一个时间戳文件夹获取 5. 返回最新的n个分片路径list(按时间顺序,最旧的在前) """ if not os.path.exists(camera_root_dir): return [] # 获取所有时间戳文件夹并排序(字符串排序即时间排序) timestamp_folders = [] for folder_name in os.listdir(camera_root_dir): folder_path = os.path.join(camera_root_dir, folder_name) if os.path.isdir(folder_path): timestamp_folders.append(folder_name) if not timestamp_folders: return [] # 降序排序,最新的在前 timestamp_folders.sort(reverse=True) # 收集分片 all_segments = [] for ts_folder in timestamp_folders: ts_folder_path = os.path.join(camera_root_dir, ts_folder) pattern = os.path.join(ts_folder_path, "segment_*.ts") segment_files = glob.glob(pattern) # 按分片序号排序 segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0])) all_segments.extend(segment_files) # 已经收集够了 if len(all_segments) >= n: break # 返回最新的n个(取最后n个,因为最新的在后面) if len(all_segments) >= n: return all_segments[-n:] else: return all_segments def get_latest_n_segments_by_camera_id(camera_id: str, n: int) -> list: """ 根据摄像头ID获取最新的n个TS分片 Args: camera_id: 摄像头ID n: 获取的分片数量 Returns: 最新的n个分片路径list """ camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id) return get_latest_n_segments(camera_root_dir, n)