# hls_service_kadian.py # 基于HLS TS分片的卡点检测服务 # 支持从本地TS分片读取帧,按照PTS时间间隔模拟实时流 import av import os import time import threading import queue import yaml import glob from dataclasses import dataclass from biz.checkpoint.checkpoint_biz import KadianDetector, RTSP_TARGET_FPS, ALERT_PUSH_INTERVAL, FrameProcessorWorker from utils.web_socket_sender import WebSocketSender from utils.logger import get_logger logger = get_logger(__name__) WS_HOST = "0.0.0.0" WS_PORT = 8765 # HLS配置 HLS_SEGMENT_DIR = "D:\\ProjectDoc\\Police\\data\\hls" # TS分片存储目录 HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式 # ========================= 数据结构 ========================= @dataclass class CameraConfig: id: int name: str index: str # ========================= TS 文件读取线程 ========================= class TSReaderWorker(threading.Thread): def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg self.frame_queue = frame_queue self.stop_event = stop_event # HLS状态变量 self.current_segment_num = -1 # 当前处理的TS分片序号 self.start_time = None # 播放开始时间 self.base_pts = None # 第一个帧的PTS基准 # 队列阈值控制 self.queue_threshold = 150 # 队列中帧数量阈值 def find_segment_files(self): """查找TS分片文件,返回排序后的文件名列表""" pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts") files = glob.glob(pattern) # 按文件名中的数字排序 files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0])) return files def get_segment_number(self, filename): """从文件名中提取序号""" return int(filename.split('_')[-1].split('.')[0]) def find_nth_largest_segment(self, n): """查找第n大的TS分片""" files = self.find_segment_files() if len(files) < n: return None # 返回第n大的文件(从大到小排序) files_sorted = sorted(files, key=self.get_segment_number, reverse=True) return files_sorted[n-1] if n-1 < len(files_sorted) else None def read_ts_frames(self, ts_file): """读取TS文件中的所有帧,返回帧列表和PTS信息""" frames_with_pts = [] try: container = av.open(ts_file) video_stream = container.streams.video[0] for packet in container.demux(video_stream): for frame in packet.decode(): if frame.pts is not None: # 计算PTS时间(毫秒) pts_ms = frame.pts * video_stream.time_base * 1000 # 转换为numpy数组 frame_np = frame.to_ndarray(format='bgr24') frames_with_pts.append({ 'frame': frame_np, 'pts_ms': pts_ms, 'timestamp': time.time() # 当前系统时间 }) container.close() return frames_with_pts except Exception as e: logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}") return [] def initialize_playback(self): """初始化播放:找到第3大的TS分片,建立时间基准""" while not self.stop_event.is_set(): # 查找第3大的TS分片 third_largest = self.find_nth_largest_segment(3) if third_largest is not None: segment_num = self.get_segment_number(third_largest) logger.info(f"[INFO] Found segment {segment_num}, starting playback") # 读取该分片的所有帧 frames = self.read_ts_frames(third_largest) if frames: # 设置基准时间 self.current_segment_num = segment_num self.start_time = time.time() self.base_pts = frames[0]['pts_ms'] return frames else: logger.warning(f"[WARN] No frames found in segment {segment_num}") # 等待0.5秒后重试 time.sleep(0.5) logger.warning(f"[WARN] No frames found, waiting...") return [] def find_next_segment(self): """查找下一个TS分片""" next_segment_num = self.current_segment_num + 1 next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts") # 检查+1, +2, +3是否存在 check_segments = [ next_segment_num + 0, # +1 next_segment_num + 1, # +2 next_segment_num + 2 # +3 ] for seg_num in check_segments: seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts") if not os.path.exists(seg_path): return None # 所有检查的分片都存在,返回下一个分片 return next_segment_path def run(self): """主循环:读取TS文件并将帧放入队列""" try: # 初始化播放 current_frames = self.initialize_playback() if not current_frames: logger.error("[ERROR] Failed to initialize playback") return while not self.stop_event.is_set(): try: # 检查队列大小,如果超过阈值则等待 if self.frame_queue.qsize() >= self.queue_threshold: time.sleep(0.2) continue # 当前分片播放完毕,查找下一个分片 next_segment = self.find_next_segment() if next_segment is not None: # 读取下一个分片 logger.info(f"[INFO] Starting to read TS segment: {next_segment}") start_time = time.time() current_frames = self.read_ts_frames(next_segment) elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms") if current_frames: self.current_segment_num += 1 frame_index = 0 logger.info(f"[INFO] Switching to segment {self.current_segment_num}") else: logger.warning(f"[WARN] Failed to read next segment, waiting...") time.sleep(0.5) continue else: # 下一个分片不存在,等待 logger.warning(f"[WARN] No next segment found, waiting...") time.sleep(0.5) continue # 一次性将所有帧放入队列 for frame_data in current_frames: item = { "frame": frame_data['frame'], "pts_ms": frame_data['pts_ms'] } try: # 队列满时处理 if self.frame_queue.full(): try: self.frame_queue.get_nowait() except queue.Empty: pass self.frame_queue.put(item, timeout=0.5) except queue.Full: logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") break # 如果队列满,停止放入更多帧 except Exception as e: logger.error(f"[ERROR] TSReaderWorker loop error: {e}") time.sleep(1) # 出错后等待1秒再继续 except Exception as e: logger.error(f"[ERROR] TSReaderWorker main error: {e}") logger.info("[INFO] TSReaderWorker will restart in 3 seconds...") time.sleep(3) # 重新启动线程 self.run() # ========================= 帧处理线程 ========================= class HLSFrameProcessor(threading.Thread): def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, raw_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg self.frame_queue = frame_queue self.raw_queue = raw_queue self.stop_event = stop_event # 时间同步相关 self.start_time = None self.base_pts = None self.last_process_time = None # 上次处理时间 def run(self): """主循环:从帧队列获取帧,按照PTS时间间隔放入raw_queue""" try: while not self.stop_event.is_set(): try: # 从帧队列获取帧数据(阻塞等待) frame_data = self.frame_queue.get(timeout=1.0) # 初始化时间基准 if self.start_time is None: self.start_time = time.time() self.base_pts = frame_data['pts_ms'] logger.info("[INFO] Frame processor initialized time base") # 计算预期的播放时间 expected_play_time = self.start_time + (frame_data['pts_ms'] - self.base_pts) / 1000.0 current_time = time.time() # 计算时间差 time_diff = current_time - expected_play_time # 打印距离上次处理过去的时间 current_time = time.time() if self.last_process_time is not None: time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒 logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms, pts: {frame_data['pts_ms']} ms") self.last_process_time = current_time # 时间同步策略 if time_diff > 0.06: # 超过60ms,播放落后 (3帧) # 跳过这一帧追赶 logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame") self.frame_queue.task_done() continue elif time_diff < -0.06: # 超前60ms,需要等待 (3帧) # 等待到正确的时间点 wait_time = -time_diff if wait_time > 0.1: # 如果等待时间过长,重置时间基准 logger.info(f"[WARN] Too far ahead, resetting time base") self.start_time = current_time self.base_pts = frame_data['pts_ms'] else: time.sleep(wait_time) logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms") # 时间同步良好,放入raw_queue item = { "camera_id": self.camera_cfg.id, "camera_name": self.camera_cfg.name, "timestamp": current_time, "frame": frame_data["frame"], } try: # raw_queue满时处理 if self.raw_queue.full(): try: self.raw_queue.get_nowait() self.raw_queue.task_done() except queue.Empty: pass self.raw_queue.put(item, timeout=0.5) except queue.Full: logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}") # 标记帧处理完成 self.frame_queue.task_done() time.sleep(0.02) except queue.Empty: # 队列为空,继续等待 time.sleep(0.02) continue except Exception as e: logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}") time.sleep(1) # 出错后等待1秒再继续 except Exception as e: logger.error(f"[ERROR] HLSFrameProcessor main error: {e}") logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...") time.sleep(3) # 重新启动线程 self.run() # ========================= 服务主类 ========================= class HLSKadianService: def __init__(self, config_path: str = "config.yaml"): with open(config_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index=c["index"]) for c in cfg.get("cameras", [])] self.stop_event = threading.Event() self.frame_queue = queue.Queue(maxsize=2000) # 帧队列,容量较大 self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小 self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列 self.ts_reader_workers = [] self.frame_processor_workers = [] self.biz_processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event) self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT) def start(self): # 确保TS分片目录存在 os.makedirs(HLS_SEGMENT_DIR, exist_ok=True) self.ws_sender.start() self.biz_processor.start() # 为每个摄像头启动TS读取线程和帧处理线程 for cam in self.cameras: # 启动TS读取线程 ts_reader = TSReaderWorker(cam, self.frame_queue, self.stop_event) ts_reader.start() self.ts_reader_workers.append(ts_reader) # 启动帧处理线程 frame_processor = HLSFrameProcessor(cam, self.frame_queue, self.raw_queue, self.stop_event) frame_processor.start() self.frame_processor_workers.append(frame_processor) logger.info("[INFO] HLS Kadian Service started") def stop(self): self.stop_event.set() self.frame_queue.join() self.raw_queue.join() self.ws_queue.join() for w in self.ts_reader_workers: w.join(timeout=2.0) for w in self.frame_processor_workers: w.join(timeout=2.0) self.biz_processor.join(timeout=2.0) self.ws_sender.join(timeout=2.0) logger.info("[INFO] Service stopped") if __name__ == "__main__": service = HLSKadianService("config.yaml") service.start() try: while True: time.sleep(1) except KeyboardInterrupt: service.stop()