# hls_service_kadian.py # 基于HLS TS分片的卡点检测服务 # 支持从本地TS分片读取帧,按照PTS时间间隔模拟实时流 import av import os import time import threading import queue import glob import argparse import sys from dataclasses import dataclass # from biz.checkpoint.checkpoint_biz import FrameProcessorWorker # from biz.prison.prison_biz import FrameProcessorWorker # from biz.prison.trajectory02_biz import FrameProcessorWorker # from biz.prison.supervision_room_biz import FrameProcessorWorker from common.processor_factory import get_processor from common.camera_config import CameraConfig, parse_cameras_from_json, parse_cameras_from_yaml from common.contants import init_config from utils.web_socket_sender import WebSocketSender from utils.logger import get_logger logger = get_logger(__name__) # ========================= 合并的TS读取和帧处理线程 ========================= class HLSFrameProcessor(threading.Thread): def __init__(self, camera_cfg: CameraConfig, hls_root_path: str, raw_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg self.hls_root_path = hls_root_path self.raw_queue = raw_queue self.stop_event = stop_event # 获取index_code self.index_code = camera_cfg.index if not self.index_code: logger.error(f"[ERROR] Camera {camera_cfg.name} has no index_code") return # 计算摄像头对应的根目录 self.camera_root_dir = os.path.join(hls_root_path, self.index_code) # HLS状态变量 self.current_segment_path = None # 当前处理的TS分片路径 self.start_time = None # 播放开始时间 self.base_pts = None # 第一个帧的PTS基准 # 时间同步相关 self.last_process_time = None # 上次处理时间 self.last_frame_pts = None # 上一帧的PTS时间 self.pts_diff = 0 self.should_reset_time = False def get_latest_n_segments(self, n: int) -> list: """ 获取最新的n个TS分片 逻辑: 1. 获取index_code文件夹下所有时间戳文件夹 2. 按时间戳名称降序排序(最新的在前) 3. 从最新的时间戳文件夹开始获取分片 4. 如果分片数不足n,继续从上一个时间戳文件夹获取 5. 返回最新的n个分片路径list(按时间顺序,最旧的在前) """ if not os.path.exists(self.camera_root_dir): return [] # 获取所有时间戳文件夹并排序(字符串排序即时间排序) timestamp_folders = [] for folder_name in os.listdir(self.camera_root_dir): folder_path = os.path.join(self.camera_root_dir, folder_name) if os.path.isdir(folder_path): timestamp_folders.append(folder_name) if not timestamp_folders: return [] # 降序排序,最新的在前 timestamp_folders.sort(reverse=True) # 收集分片 all_segments = [] for ts_folder in timestamp_folders: ts_folder_path = os.path.join(self.camera_root_dir, ts_folder) pattern = os.path.join(ts_folder_path, "segment_*.ts") segment_files = glob.glob(pattern) # 按分片序号排序 segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0])) all_segments.extend(segment_files) # 已经收集够了 if len(all_segments) >= n: break # 返回最新的n个(取最后n个,因为最新的在后面) if len(all_segments) >= n: return all_segments[-n:] else: return all_segments def get_segment_number(self, filename): """从文件名中提取序号""" basename = os.path.basename(filename) return int(basename.split('_')[-1].split('.')[0]) def find_nth_largest_segment(self, n): """查找第n大的TS分片(最新的n个中的第1个,即倒数第n个)""" segments = self.get_latest_n_segments(n + 2) # 多获取几个确保有缓冲 if len(segments) < n: return None # 返回倒数第n个(最新的n个中最旧的那个) return segments[-n] def find_next_segment(self): """查找下一个TS分片""" latest_segments = self.get_latest_n_segments(10) if not latest_segments: return None # 情况1:当前分片在list中 → 返回下一个 if self.current_segment_path in latest_segments: current_index = latest_segments.index(self.current_segment_path) if current_index + 3 < len(latest_segments): return latest_segments[current_index + 1] else: return None # 当前是最新分片,等待 # 情况2:当前分片不在list中 → 返回最新第3个(重新同步) if len(latest_segments) >= 3: logger.warning(f"[WARN] Current segment not in latest list, re-syncing") return latest_segments[-3] # 倒数第3个 else: return None # 分片不足,等待 def initialize_playback(self): """初始化播放:找到最新第3大的TS分片,建立时间基准""" while not self.stop_event.is_set(): # 查找最新第3大的TS分片 third_largest = self.find_nth_largest_segment(3) if third_largest is not None: logger.info(f"[INFO] Found segment: {os.path.basename(third_largest)}, starting playback") # 设置基准时间 self.current_segment_path = third_largest self.start_time = time.time() return third_largest # 等待0.5秒后重试 time.sleep(0.5) logger.warning(f"[WARN] No segments found in {self.camera_root_dir}, waiting...") return None def process_frame_with_rate_control(self, frame_data, frame_pts): """处理单帧,加入帧率控制逻辑""" # 初始化时间基准 if self.base_pts is None: self.base_pts = frame_pts self.last_frame_pts = frame_pts logger.info("[INFO] Frame processor initialized time base") # 计算预期的播放时间 expected_play_time = self.start_time + (frame_pts - self.base_pts) / 1000.0 current_time = time.time() # 计算时间差 wait_time = expected_play_time - current_time if wait_time > 0: if wait_time > 1: # 如果等待时间过长,重置时间基准 logger.info(f"[WARN] Too far ahead, resetting time base") self.start_time = current_time self.base_pts = frame_pts else: time.sleep(wait_time) else : if wait_time < -1: logger.info(f"[WARN] Too far behind, resetting time base") self.start_time = current_time self.base_pts = frame_pts item = { "camera_id": self.camera_cfg.id, "camera_name": self.camera_cfg.name, "timestamp": current_time, "camera_index": self.camera_cfg.index, "frame": frame_data, } try: # raw_queue满时处理 if self.raw_queue.full(): try: self.raw_queue.get_nowait() self.raw_queue.task_done() except queue.Empty: pass self.raw_queue.put(item, timeout=0.5) return True except queue.Full: logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}") return False def run(self): """主循环:读取TS文件并实时处理帧,加入帧率控制""" try: # 初始化播放 initial_segment = self.initialize_playback() if not initial_segment: logger.error("[ERROR] Failed to initialize playback") return # 处理初始分片 self.process_segment_with_rate_control(initial_segment) # 连续检测失败计数器 consecutive_failures = 0 while not self.stop_event.is_set(): try: # 查找下一个分片 next_segment = self.find_next_segment() if next_segment is not None: # 重置连续失败计数器 consecutive_failures = 0 # 处理下一个分片 logger.info(f"[INFO] Starting to process TS segment: {os.path.basename(next_segment)}") self.current_segment_path = next_segment self.process_segment_with_rate_control(next_segment) logger.info(f"[INFO] Finished processing segment: {os.path.basename(next_segment)}") else: # 下一个分片不存在,根据连续失败次数调整等待时间 consecutive_failures += 1 self.should_reset_time = True if consecutive_failures <= 10: sleep_time = 0.02 # 前10次失败,等待0.02秒 elif consecutive_failures <= 20: sleep_time = 0.05 # 继续5次失败,等待0.05秒 else: sleep_time = 0.5 # 超过15次失败,等待0.5秒 logger.warning(f"[WARN] No next segment found (failures: {consecutive_failures}), waiting {sleep_time}s...") time.sleep(sleep_time) except Exception as e: logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}") time.sleep(1) # 出错后等待1秒再继续 except Exception as e: logger.error(f"[ERROR] HLSFrameProcessor main error: {e}") logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...") time.sleep(3) # 重新启动线程 self.run() def process_segment_with_rate_control(self, segment_path): """处理单个TS分片,加入实时帧率控制""" try: container = av.open(segment_path) video_stream = container.streams.video[0] # 重置时间轴:使用新分片的第一帧PTS重新设置时间基准 current_time = time.time() if self.base_pts is None or self.should_reset_time: # 读取第一帧来获取PTS for packet in container.demux(video_stream): for frame in packet.decode(): if frame.pts is not None: # 重置时间基准 self.start_time = current_time self.base_pts = float(frame.pts * video_stream.time_base) * 1000 self.should_reset_time = False logger.info(f"[INFO] Time axis reset: start_time={current_time}, base_pts={self.base_pts:.1f}") break break # 重置容器到开始 container.close() container = av.open(segment_path) video_stream = container.streams.video[0] # 处理每一帧 for packet in container.demux(video_stream): for frame in packet.decode(): if frame.pts is not None and not self.stop_event.is_set(): # 计算PTS时间(毫秒) pts_ms = frame.pts * video_stream.time_base * 1000 # 转换为numpy数组 frame_np = frame.to_ndarray(format='bgr24') # 实时处理帧,加入帧率控制 self.process_frame_with_rate_control(frame_np, pts_ms) container.close() except Exception as e: logger.error(f"[ERROR] Failed to process TS file {segment_path}: {e}") # ========================= 服务主类 ========================= class HLSKadianService: def __init__(self, cameras: list[CameraConfig], hls_root_path: str, ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""): self.cameras = cameras self.hls_root_path = hls_root_path self.ws_host = ws_host self.ws_port = ws_port self.algorithm = algorithm self.stop_event = threading.Event() self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小 self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列 self.frame_processor_workers = [] self.biz_processor = get_processor(self.algorithm)(self.raw_queue, self.ws_queue, self.stop_event, self.cameras) self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port) def start(self): # 确保HLS根目录存在 os.makedirs(self.hls_root_path, exist_ok=True) self.ws_sender.start() self.biz_processor.start() # 为每个摄像头启动合并后的帧处理线程 for cam in self.cameras: if not cam.index: logger.warning(f"[WARN] Camera {cam.name} has no index_code, skipping") continue # 启动合并后的帧处理线程(包含TS读取和帧率控制) frame_processor = HLSFrameProcessor(cam, self.hls_root_path, self.raw_queue, self.stop_event) frame_processor.start() self.frame_processor_workers.append(frame_processor) logger.info("[INFO] HLS Kadian Service started (merged TS reader and frame processor)") def stop(self): self.stop_event.set() self.raw_queue.join() self.ws_queue.join() for w in self.frame_processor_workers: w.join(timeout=2.0) self.biz_processor.join(timeout=2.0) self.ws_sender.join(timeout=2.0) logger.info("[INFO] Service stopped") if __name__ == "__main__": parser = argparse.ArgumentParser(description="HLS Service for Detection") parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)") parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file") parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host") parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port") parser.add_argument("--algorithm", type=str, default="", help="Algorithm type") parser.add_argument("--hls-root-path", type=str, required=True, help="HLS root path for TS segments") args = parser.parse_args() # 初始化全局配置 init_config(args.config) # 优先使用命令行传入的 cameras JSON,否则读取配置文件 if args.cameras: cameras = parse_cameras_from_json(args.cameras) logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument") else: cameras = parse_cameras_from_yaml(args.config) logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}") if not cameras: logger.error("[ERROR] No cameras configured, exiting...") sys.exit(1) service = HLSKadianService(cameras, hls_root_path=args.hls_root_path, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm) service.start() try: while True: time.sleep(1) except KeyboardInterrupt: service.stop()