Files
SupervisorAI/utils/hls_utils.py

196 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import glob
import re
from common import constants
def get_latest_n_segments(camera_root_dir: str, n: int) -> list:
"""
获取最新的n个TS分片
逻辑:
1. 获取index_code文件夹下所有时间戳文件夹
2. 按时间戳名称降序排序(最新的在前)
3. 从最新的时间戳文件夹开始获取分片
4. 如果分片数不足n继续从上一个时间戳文件夹获取
5. 返回最新的n个分片路径list按时间顺序最旧的在前
"""
if not os.path.exists(camera_root_dir):
return []
# 获取所有时间戳文件夹并排序(字符串排序即时间排序)
timestamp_folders = []
for folder_name in os.listdir(camera_root_dir):
folder_path = os.path.join(camera_root_dir, folder_name)
if os.path.isdir(folder_path):
timestamp_folders.append(folder_name)
if not timestamp_folders:
return []
# 降序排序,最新的在前
timestamp_folders.sort(reverse=True)
# 收集分片
all_segments = []
for ts_folder in timestamp_folders:
ts_folder_path = os.path.join(camera_root_dir, ts_folder)
pattern = os.path.join(ts_folder_path, "segment_*.ts")
segment_files = glob.glob(pattern)
# 按分片序号排序
segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
all_segments.extend(segment_files)
# 已经收集够了
if len(all_segments) >= n:
break
# 返回最新的n个取最后n个因为最新的在后面
if len(all_segments) >= n:
return all_segments[-n:]
else:
return all_segments
def get_latest_n_segments_by_camera_id(camera_id: str, n: int) -> list:
"""
根据摄像头ID获取最新的n个TS分片
Args:
camera_id: 摄像头ID
n: 获取的分片数量
Returns:
最新的n个分片路径list
"""
camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id)
return get_latest_n_segments(camera_root_dir, n)
def parse_segment_info(segment_path: str) -> tuple:
"""
从TS分片路径解析出 camera_id, timestamp, sequence
Args:
segment_path: TS分片路径格式如: hls_root_path/camera_id/timestamp/segment_xxxxx.ts
Returns:
(camera_id, timestamp, sequence) 或 (None, None, None) 解析失败时
"""
try:
# 获取文件名和目录结构
# 路径格式: .../camera_id/timestamp/segment_00001.ts
abs_path = os.path.abspath(segment_path)
parts = abs_path.split(os.sep)
# 从后往前找
# parts[-1] = segment_00001.ts
# parts[-2] = timestamp
# parts[-3] = camera_id
if len(parts) < 3:
return None, None, None
# 解析序号
filename = parts[-1]
match = re.search(r'segment_(\d+)\.ts$', filename)
if not match:
return None, None, None
sequence = match.group(1)
# 时间戳
timestamp = parts[-2]
# camera_id
camera_id = parts[-3]
return camera_id, timestamp, sequence
except Exception:
return None, None, None
def get_segments_before_current(current_segment_path: str, n: int) -> list:
"""
获取当前分片之前的n个分片包括当前分片
Args:
current_segment_path: 当前TS分片路径
n: 需要获取的分片数量
Returns:
分片路径列表按时间顺序旧的在前如果不够n个则返回现有的
"""
if not current_segment_path or not os.path.exists(current_segment_path):
return []
# 解析当前分片信息
camera_id, timestamp, current_seq = parse_segment_info(current_segment_path)
if not camera_id:
return []
# 构建摄像头根目录和时间戳目录
camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id)
timestamp_dir = os.path.join(camera_root_dir, timestamp)
if not os.path.exists(timestamp_dir):
return []
# 获取当前时间戳文件夹下的所有分片
pattern = os.path.join(timestamp_dir, "segment_*.ts")
segment_files = glob.glob(pattern)
# 按分片序号排序
segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
# 找到当前分片的位置
try:
current_index = segment_files.index(current_segment_path)
except ValueError:
# 当前分片不在列表中
return []
# 计算起始位置
start_index = max(0, current_index - n + 1)
# 返回从起始位置到当前位置的所有分片
result = segment_files[start_index:current_index + 1]
# 如果不够n个需要从之前的时间戳文件夹获取
if len(result) < n:
# 获取所有时间戳文件夹
timestamp_folders = []
for folder_name in os.listdir(camera_root_dir):
folder_path = os.path.join(camera_root_dir, folder_name)
if os.path.isdir(folder_path):
timestamp_folders.append(folder_name)
# 排序,找到当前时间戳之前的时间戳
timestamp_folders.sort()
try:
current_ts_index = timestamp_folders.index(timestamp)
except ValueError:
current_ts_index = len(timestamp_folders)
# 从之前的时间戳文件夹获取分片
needed_count = n - len(result)
for i in range(current_ts_index - 1, -1, -1):
prev_ts_dir = os.path.join(camera_root_dir, timestamp_folders[i])
prev_pattern = os.path.join(prev_ts_dir, "segment_*.ts")
prev_segments = glob.glob(prev_pattern)
prev_segments.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
# 取最后 needed_count 个
take_count = min(needed_count, len(prev_segments))
if take_count > 0:
result = prev_segments[-take_count:] + result
needed_count -= take_count
if needed_count <= 0:
break
return result