将get_latest_n_segments分离出来到utils
This commit is contained in:
@@ -0,0 +1,52 @@
|
||||
import os
|
||||
import glob
|
||||
|
||||
|
||||
def get_latest_n_segments(camera_root_dir: str, n: int) -> list:
|
||||
"""
|
||||
获取最新的n个TS分片
|
||||
|
||||
逻辑:
|
||||
1. 获取index_code文件夹下所有时间戳文件夹
|
||||
2. 按时间戳名称降序排序(最新的在前)
|
||||
3. 从最新的时间戳文件夹开始获取分片
|
||||
4. 如果分片数不足n,继续从上一个时间戳文件夹获取
|
||||
5. 返回最新的n个分片路径list(按时间顺序,最旧的在前)
|
||||
"""
|
||||
if not os.path.exists(camera_root_dir):
|
||||
return []
|
||||
|
||||
# 获取所有时间戳文件夹并排序(字符串排序即时间排序)
|
||||
timestamp_folders = []
|
||||
for folder_name in os.listdir(camera_root_dir):
|
||||
folder_path = os.path.join(camera_root_dir, folder_name)
|
||||
if os.path.isdir(folder_path):
|
||||
timestamp_folders.append(folder_name)
|
||||
|
||||
if not timestamp_folders:
|
||||
return []
|
||||
|
||||
# 降序排序,最新的在前
|
||||
timestamp_folders.sort(reverse=True)
|
||||
|
||||
# 收集分片
|
||||
all_segments = []
|
||||
for ts_folder in timestamp_folders:
|
||||
ts_folder_path = os.path.join(camera_root_dir, ts_folder)
|
||||
pattern = os.path.join(ts_folder_path, "segment_*.ts")
|
||||
segment_files = glob.glob(pattern)
|
||||
|
||||
# 按分片序号排序
|
||||
segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
|
||||
|
||||
all_segments.extend(segment_files)
|
||||
|
||||
# 已经收集够了
|
||||
if len(all_segments) >= n:
|
||||
break
|
||||
|
||||
# 返回最新的n个(取最后n个,因为最新的在后面)
|
||||
if len(all_segments) >= n:
|
||||
return all_segments[-n:]
|
||||
else:
|
||||
return all_segments
|
||||
|
||||
Reference in New Issue
Block a user