From cd27032e6d87c961479ed4b33eb8b5dcc2ae9cb9 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Sat, 28 Feb 2026 14:51:16 +0800 Subject: [PATCH] =?UTF-8?q?hls=E9=80=82=E9=85=8D=E6=96=B0=E7=9A=84?= =?UTF-8?q?=E5=88=86=E7=89=87=E4=B8=8B=E8=BD=BD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/contants.py | 2 + hls_service_ws_kadian.py | 163 ++++++++++++++++++++++++--------------- main_start.py | 4 + 3 files changed, 107 insertions(+), 62 deletions(-) diff --git a/common/contants.py b/common/contants.py index 8a8413f..484af5b 100644 --- a/common/contants.py +++ b/common/contants.py @@ -7,6 +7,8 @@ logger = get_logger(__name__) ALERT_PUSH_URL = "" HLS_ROOT_PATH = "" +HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式 + def init_config(config_path: str = "config.yaml"): """ diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py index 1651321..2dc42c5 100644 --- a/hls_service_ws_kadian.py +++ b/hls_service_ws_kadian.py @@ -27,21 +27,27 @@ from utils.logger import get_logger logger = get_logger(__name__) -# HLS配置 -HLS_SEGMENT_DIR = "D:\\ProjectDoc\\Police\\data\\hls" # TS分片存储目录 -HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式 - # ========================= 合并的TS读取和帧处理线程 ========================= class HLSFrameProcessor(threading.Thread): - def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event): + def __init__(self, camera_cfg: CameraConfig, hls_root_path: str, raw_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg + self.hls_root_path = hls_root_path self.raw_queue = raw_queue self.stop_event = stop_event + # 获取index_code + self.index_code = camera_cfg.index + if not self.index_code: + logger.error(f"[ERROR] Camera {camera_cfg.name} has no index_code") + return + + # 计算摄像头对应的根目录 + self.camera_root_dir = os.path.join(hls_root_path, self.index_code) + # HLS状态变量 - self.current_segment_num = -1 # 当前处理的TS分片序号 + self.current_segment_path = None # 当前处理的TS分片路径 self.start_time = None # 播放开始时间 self.base_pts = None # 第一个帧的PTS基准 @@ -52,65 +58,107 @@ class HLSFrameProcessor(threading.Thread): self.should_reset_time = False - def find_segment_files(self): - """查找TS分片文件,返回排序后的文件名列表""" - pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts") - files = glob.glob(pattern) - # 按文件名中的数字排序 - files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0])) - return files + def get_latest_n_segments(self, n: int) -> list: + """ + 获取最新的n个TS分片 + + 逻辑: + 1. 获取index_code文件夹下所有时间戳文件夹 + 2. 按时间戳名称降序排序(最新的在前) + 3. 从最新的时间戳文件夹开始获取分片 + 4. 如果分片数不足n,继续从上一个时间戳文件夹获取 + 5. 返回最新的n个分片路径list(按时间顺序,最旧的在前) + """ + if not os.path.exists(self.camera_root_dir): + return [] + + # 获取所有时间戳文件夹并排序(字符串排序即时间排序) + timestamp_folders = [] + for folder_name in os.listdir(self.camera_root_dir): + folder_path = os.path.join(self.camera_root_dir, folder_name) + if os.path.isdir(folder_path): + timestamp_folders.append(folder_name) + + if not timestamp_folders: + return [] + + # 降序排序,最新的在前 + timestamp_folders.sort(reverse=True) + + # 收集分片 + all_segments = [] + for ts_folder in timestamp_folders: + ts_folder_path = os.path.join(self.camera_root_dir, ts_folder) + pattern = os.path.join(ts_folder_path, "segment_*.ts") + segment_files = glob.glob(pattern) + + # 按分片序号排序 + segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0])) + + all_segments.extend(segment_files) + + # 已经收集够了 + if len(all_segments) >= n: + break + + # 返回最新的n个(取最后n个,因为最新的在后面) + if len(all_segments) >= n: + return all_segments[-n:] + else: + return all_segments def get_segment_number(self, filename): """从文件名中提取序号""" - return int(filename.split('_')[-1].split('.')[0]) + basename = os.path.basename(filename) + return int(basename.split('_')[-1].split('.')[0]) def find_nth_largest_segment(self, n): - """查找第n大的TS分片""" - files = self.find_segment_files() - if len(files) < n: + """查找第n大的TS分片(最新的n个中的第1个,即倒数第n个)""" + segments = self.get_latest_n_segments(n + 2) # 多获取几个确保有缓冲 + if len(segments) < n: return None - # 返回第n大的文件(从大到小排序) - files_sorted = sorted(files, key=self.get_segment_number, reverse=True) - return files_sorted[n-1] if n-1 < len(files_sorted) else None + # 返回倒数第n个(最新的n个中最旧的那个) + return segments[-n] def find_next_segment(self): """查找下一个TS分片""" - next_segment_num = self.current_segment_num + 1 - next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts") + latest_segments = self.get_latest_n_segments(10) - # 检查+1, +2, +3是否存在 - check_segments = [ - next_segment_num + 0, # +1 - next_segment_num + 1, # +2 - next_segment_num + 2 # +3 - ] + if not latest_segments: + return None - for seg_num in check_segments: - seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts") - if not os.path.exists(seg_path): - return None + # 情况1:当前分片在list中 → 返回下一个 + if self.current_segment_path in latest_segments: + current_index = latest_segments.index(self.current_segment_path) + if current_index + 3 < len(latest_segments): + return latest_segments[current_index + 1] + else: + return None # 当前是最新分片,等待 - # 所有检查的分片都存在,返回下一个分片 - return next_segment_path + # 情况2:当前分片不在list中 → 返回最新第3个(重新同步) + if len(latest_segments) >= 3: + logger.warning(f"[WARN] Current segment not in latest list, re-syncing") + return latest_segments[-3] # 倒数第3个 + else: + return None # 分片不足,等待 def initialize_playback(self): - """初始化播放:找到第3大的TS分片,建立时间基准""" + """初始化播放:找到最新第3大的TS分片,建立时间基准""" while not self.stop_event.is_set(): - # 查找第3大的TS分片 + # 查找最新第3大的TS分片 third_largest = self.find_nth_largest_segment(3) if third_largest is not None: - segment_num = self.get_segment_number(third_largest) - logger.info(f"[INFO] Found segment {segment_num}, starting playback") + logger.info(f"[INFO] Found segment: {os.path.basename(third_largest)}, starting playback") # 设置基准时间 - self.current_segment_num = segment_num + self.current_segment_path = third_largest self.start_time = time.time() return third_largest # 等待0.5秒后重试 time.sleep(0.5) - logger.warning(f"[WARN] No segments found, waiting...") + logger.warning(f"[WARN] No segments found in {self.camera_root_dir}, waiting...") return None @@ -129,15 +177,6 @@ class HLSFrameProcessor(threading.Thread): # 计算时间差 wait_time = expected_play_time - current_time - # # 打印距离上次处理过去的时间 - # current_time = time.time() - # if self.last_process_time is not None: - # time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒 - # self.pts_diff = frame_pts - self.last_frame_pts # 与上一帧的PTS差 - # # logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms, pts: {frame_pts} ms, pts_diff: {self.pts_diff}ms") - # self.last_process_time = current_time - # self.last_frame_pts = frame_pts - if wait_time > 0: if wait_time > 1: # 如果等待时间过长,重置时间基准 logger.info(f"[WARN] Too far ahead, resetting time base") @@ -145,14 +184,11 @@ class HLSFrameProcessor(threading.Thread): self.base_pts = frame_pts else: time.sleep(wait_time) - # logger.info(f"[DEBUG] Frame Waiting for {wait_time*1000:.1f}ms") else : if wait_time < -1: logger.info(f"[WARN] Too far behind, resetting time base") self.start_time = current_time self.base_pts = frame_pts - # else: - # logger.info(f"[WARN] frame ahead, no waiting") item = { "camera_id": self.camera_cfg.id, @@ -171,8 +207,6 @@ class HLSFrameProcessor(threading.Thread): pass self.raw_queue.put(item, timeout=0.5) - # if self.pts_diff > 0: - # time.sleep(self.pts_diff/1000) return True except queue.Full: @@ -204,12 +238,12 @@ class HLSFrameProcessor(threading.Thread): consecutive_failures = 0 # 处理下一个分片 - logger.info(f"[INFO] Starting to process TS segment: {next_segment}") - self.current_segment_num += 1 + logger.info(f"[INFO] Starting to process TS segment: {os.path.basename(next_segment)}") + self.current_segment_path = next_segment self.process_segment_with_rate_control(next_segment) - logger.info(f"[INFO] Finished processing segment {self.current_segment_num}") + logger.info(f"[INFO] Finished processing segment: {os.path.basename(next_segment)}") else: # 下一个分片不存在,根据连续失败次数调整等待时间 consecutive_failures += 1 @@ -287,8 +321,9 @@ class HLSFrameProcessor(threading.Thread): # ========================= 服务主类 ========================= class HLSKadianService: - def __init__(self, cameras: list[CameraConfig], ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""): + def __init__(self, cameras: list[CameraConfig], hls_root_path: str, ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""): self.cameras = cameras + self.hls_root_path = hls_root_path self.ws_host = ws_host self.ws_port = ws_port self.algorithm = algorithm @@ -302,16 +337,19 @@ class HLSKadianService: self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port) def start(self): - # 确保TS分片目录存在 - os.makedirs(HLS_SEGMENT_DIR, exist_ok=True) + # 确保HLS根目录存在 + os.makedirs(self.hls_root_path, exist_ok=True) self.ws_sender.start() self.biz_processor.start() # 为每个摄像头启动合并后的帧处理线程 for cam in self.cameras: + if not cam.index: + logger.warning(f"[WARN] Camera {cam.name} has no index_code, skipping") + continue # 启动合并后的帧处理线程(包含TS读取和帧率控制) - frame_processor = HLSFrameProcessor(cam, self.raw_queue, self.stop_event) + frame_processor = HLSFrameProcessor(cam, self.hls_root_path, self.raw_queue, self.stop_event) frame_processor.start() self.frame_processor_workers.append(frame_processor) @@ -337,6 +375,7 @@ if __name__ == "__main__": parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host") parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port") parser.add_argument("--algorithm", type=str, default="", help="Algorithm type") + parser.add_argument("--hls-root-path", type=str, required=True, help="HLS root path for TS segments") args = parser.parse_args() # 初始化全局配置 @@ -354,7 +393,7 @@ if __name__ == "__main__": logger.error("[ERROR] No cameras configured, exiting...") sys.exit(1) - service = HLSKadianService(cameras, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm) + service = HLSKadianService(cameras, hls_root_path=args.hls_root_path, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm) service.start() try: while True: diff --git a/main_start.py b/main_start.py index 00a843a..034b85b 100644 --- a/main_start.py +++ b/main_start.py @@ -190,6 +190,10 @@ def start_service_group(group: dict, hls_config: dict = None): "--algorithm", algorithm ] + # 如果是HLS类型,添加hls-root-path参数 + if video_source_type == "hls" and hls_config: + cmd.extend(["--hls-root-path", hls_config.get("hls_root_path", "")]) + logger.info(f"[INFO] Starting service group '{group_name}': ws={ws_host}:{ws_port}, algorithm={algorithm}, source={video_source_type}") # DEBUG_MODE=True 时前台运行,方便调试;False 时后台运行,适合部署