From 8cb7610ac7b860c8a6952ec459425360b6fa1bf3 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Thu, 5 Feb 2026 11:20:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E8=AF=BB=E5=8F=96ts=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=92=8Cframe=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hls_service_ws_kadian.py | 381 ++++++++++++++++----------------------- 1 file changed, 155 insertions(+), 226 deletions(-) diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py index 4e7ec54..df67185 100644 --- a/hls_service_ws_kadian.py +++ b/hls_service_ws_kadian.py @@ -34,12 +34,12 @@ class CameraConfig: index: str -# ========================= TS 文件读取线程 ========================= -class TSReaderWorker(threading.Thread): - def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, stop_event: threading.Event): +# ========================= 合并的TS读取和帧处理线程 ========================= +class HLSFrameProcessor(threading.Thread): + def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg - self.frame_queue = frame_queue + self.raw_queue = raw_queue self.stop_event = stop_event # HLS状态变量 @@ -47,8 +47,10 @@ class TSReaderWorker(threading.Thread): self.start_time = None # 播放开始时间 self.base_pts = None # 第一个帧的PTS基准 - # 队列阈值控制 - self.queue_threshold = 150 # 队列中帧数量阈值 + # 时间同步相关 + self.last_process_time = None # 上次处理时间 + self.last_frame_pts = None # 上一帧的PTS时间 + self.pts_diff = 0 def find_segment_files(self): """查找TS分片文件,返回排序后的文件名列表""" @@ -71,63 +73,6 @@ class TSReaderWorker(threading.Thread): files_sorted = sorted(files, key=self.get_segment_number, reverse=True) return files_sorted[n-1] if n-1 < len(files_sorted) else None - def read_ts_frames(self, ts_file): - """读取TS文件中的所有帧,返回帧列表和PTS信息""" - frames_with_pts = [] - - try: - container = av.open(ts_file) - video_stream = container.streams.video[0] - - for packet in container.demux(video_stream): - for frame in packet.decode(): - if frame.pts is not None: - # 计算PTS时间(毫秒) - pts_ms = frame.pts * video_stream.time_base * 1000 - - # 转换为numpy数组 - frame_np = frame.to_ndarray(format='bgr24') - - frames_with_pts.append({ - 'frame': frame_np, - 'pts_ms': pts_ms, - 'timestamp': time.time() # 当前系统时间 - }) - - container.close() - return frames_with_pts - - except Exception as e: - logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}") - return [] - - def initialize_playback(self): - """初始化播放:找到第3大的TS分片,建立时间基准""" - while not self.stop_event.is_set(): - # 查找第3大的TS分片 - third_largest = self.find_nth_largest_segment(3) - - if third_largest is not None: - segment_num = self.get_segment_number(third_largest) - logger.info(f"[INFO] Found segment {segment_num}, starting playback") - - # 读取该分片的所有帧 - frames = self.read_ts_frames(third_largest) - if frames: - # 设置基准时间 - self.current_segment_num = segment_num - self.start_time = time.time() - self.base_pts = frames[0]['pts_ms'] - return frames - else: - logger.warning(f"[WARN] No frames found in segment {segment_num}") - - # 等待0.5秒后重试 - time.sleep(0.5) - logger.warning(f"[WARN] No frames found, waiting...") - - return [] - def find_next_segment(self): """查找下一个TS分片""" next_segment_num = self.current_segment_num + 1 @@ -148,172 +93,125 @@ class TSReaderWorker(threading.Thread): # 所有检查的分片都存在,返回下一个分片 return next_segment_path + def initialize_playback(self): + """初始化播放:找到第3大的TS分片,建立时间基准""" + while not self.stop_event.is_set(): + # 查找第3大的TS分片 + third_largest = self.find_nth_largest_segment(3) + + if third_largest is not None: + segment_num = self.get_segment_number(third_largest) + logger.info(f"[INFO] Found segment {segment_num}, starting playback") + + # 设置基准时间 + self.current_segment_num = segment_num + self.start_time = time.time() + return third_largest + + # 等待0.5秒后重试 + time.sleep(0.5) + logger.warning(f"[WARN] No segments found, waiting...") + + return None + + def process_frame_with_rate_control(self, frame_data, frame_pts): + """处理单帧,加入帧率控制逻辑""" + # 初始化时间基准 + if self.base_pts is None: + self.base_pts = frame_pts + self.last_frame_pts = frame_pts + logger.info("[INFO] Frame processor initialized time base") + + # 计算预期的播放时间 + expected_play_time = self.start_time + (frame_pts - self.base_pts) / 1000.0 + current_time = time.time() + + # 计算时间差 + time_diff = current_time - expected_play_time + + # 打印距离上次处理过去的时间 + current_time = time.time() + if self.last_process_time is not None: + time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒 + self.pts_diff = frame_pts - self.last_frame_pts # 与上一帧的PTS差 + # logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms, pts: {frame_pts} ms, pts_diff: {self.pts_diff}ms") + self.last_process_time = current_time + self.last_frame_pts = frame_pts + + # 时间同步策略 + if time_diff > 0.06: # 超过60ms,播放落后 (3帧) + # 跳过这一帧追赶 + logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame") + return False + elif time_diff < -0.06: # 超前60ms,需要等待 (3帧) + # 等待到正确的时间点 + wait_time = -time_diff + if wait_time > 0.1: # 如果等待时间过长,重置时间基准 + logger.info(f"[WARN] Too far ahead, resetting time base") + self.start_time = current_time + self.base_pts = frame_pts + else: + time.sleep(wait_time) + logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms") + + # 时间同步良好,放入raw_queue + item = { + "camera_id": self.camera_cfg.id, + "camera_name": self.camera_cfg.name, + "timestamp": current_time, + "frame": frame_data, + } + + try: + # raw_queue满时处理 + if self.raw_queue.full(): + try: + self.raw_queue.get_nowait() + self.raw_queue.task_done() + except queue.Empty: + pass + + self.raw_queue.put(item, timeout=0.5) + + time.sleep(0.02) + # if self.pts_diff > 0: + # time.sleep(self.pts_diff/1000) + return True + + except queue.Full: + logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}") + return False + def run(self): - """主循环:读取TS文件并将帧放入队列""" + """主循环:读取TS文件并实时处理帧,加入帧率控制""" try: # 初始化播放 - current_frames = self.initialize_playback() - if not current_frames: + initial_segment = self.initialize_playback() + if not initial_segment: logger.error("[ERROR] Failed to initialize playback") return + # 处理初始分片 + self.process_segment_with_rate_control(initial_segment) while not self.stop_event.is_set(): try: - # 检查队列大小,如果超过阈值则等待 - if self.frame_queue.qsize() >= self.queue_threshold: - time.sleep(0.2) - continue - - # 当前分片播放完毕,查找下一个分片 + # 查找下一个分片 next_segment = self.find_next_segment() if next_segment is not None: - # 读取下一个分片 - logger.info(f"[INFO] Starting to read TS segment: {next_segment}") - start_time = time.time() - - current_frames = self.read_ts_frames(next_segment) - - elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 - logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms") - - if current_frames: - self.current_segment_num += 1 - frame_index = 0 - logger.info(f"[INFO] Switching to segment {self.current_segment_num}") - else: - logger.warning(f"[WARN] Failed to read next segment, waiting...") - time.sleep(0.5) - continue + # 处理下一个分片 + logger.info(f"[INFO] Starting to process TS segment: {next_segment}") + self.current_segment_num += 1 + + self.process_segment_with_rate_control(next_segment) + + logger.info(f"[INFO] Finished processing segment {self.current_segment_num}") else: # 下一个分片不存在,等待 logger.warning(f"[WARN] No next segment found, waiting...") time.sleep(0.5) - continue - - # 一次性将所有帧放入队列 - for frame_data in current_frames: - item = { - "frame": frame_data['frame'], - "pts_ms": frame_data['pts_ms'] - } - try: - # 队列满时处理 - if self.frame_queue.full(): - try: - self.frame_queue.get_nowait() - except queue.Empty: - pass - - self.frame_queue.put(item, timeout=0.5) - - except queue.Full: - logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") - break # 如果队列满,停止放入更多帧 - - except Exception as e: - logger.error(f"[ERROR] TSReaderWorker loop error: {e}") - time.sleep(1) # 出错后等待1秒再继续 - - except Exception as e: - logger.error(f"[ERROR] TSReaderWorker main error: {e}") - logger.info("[INFO] TSReaderWorker will restart in 3 seconds...") - time.sleep(3) - # 重新启动线程 - self.run() - - -# ========================= 帧处理线程 ========================= -class HLSFrameProcessor(threading.Thread): - def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, raw_queue: queue.Queue, stop_event: threading.Event): - super().__init__(daemon=True) - self.camera_cfg = camera_cfg - self.frame_queue = frame_queue - self.raw_queue = raw_queue - self.stop_event = stop_event - - # 时间同步相关 - self.start_time = None - self.base_pts = None - self.last_process_time = None # 上次处理时间 - - def run(self): - """主循环:从帧队列获取帧,按照PTS时间间隔放入raw_queue""" - try: - while not self.stop_event.is_set(): - try: - # 从帧队列获取帧数据(阻塞等待) - frame_data = self.frame_queue.get(timeout=1.0) - - # 初始化时间基准 - if self.start_time is None: - self.start_time = time.time() - self.base_pts = frame_data['pts_ms'] - logger.info("[INFO] Frame processor initialized time base") - - - # 计算预期的播放时间 - expected_play_time = self.start_time + (frame_data['pts_ms'] - self.base_pts) / 1000.0 - current_time = time.time() - - # 计算时间差 - time_diff = current_time - expected_play_time - - # 打印距离上次处理过去的时间 - current_time = time.time() - if self.last_process_time is not None: - time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒 - logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms, pts: {frame_data['pts_ms']} ms") - self.last_process_time = current_time - - # 时间同步策略 - if time_diff > 0.06: # 超过60ms,播放落后 (3帧) - # 跳过这一帧追赶 - logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame") - self.frame_queue.task_done() - continue - elif time_diff < -0.06: # 超前60ms,需要等待 (3帧) - # 等待到正确的时间点 - wait_time = -time_diff - if wait_time > 0.1: # 如果等待时间过长,重置时间基准 - logger.info(f"[WARN] Too far ahead, resetting time base") - self.start_time = current_time - self.base_pts = frame_data['pts_ms'] - else: - time.sleep(wait_time) - logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms") - - # 时间同步良好,放入raw_queue - item = { - "camera_id": self.camera_cfg.id, - "camera_name": self.camera_cfg.name, - "timestamp": current_time, - "frame": frame_data["frame"], - } - - try: - # raw_queue满时处理 - if self.raw_queue.full(): - try: - self.raw_queue.get_nowait() - self.raw_queue.task_done() - except queue.Empty: - pass - - self.raw_queue.put(item, timeout=0.5) - - except queue.Full: - logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}") - - # 标记帧处理完成 - self.frame_queue.task_done() - time.sleep(0.02) - except queue.Empty: - # 队列为空,继续等待 - time.sleep(0.02) - continue except Exception as e: logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}") time.sleep(1) # 出错后等待1秒再继续 @@ -324,6 +222,47 @@ class HLSFrameProcessor(threading.Thread): time.sleep(3) # 重新启动线程 self.run() + + def process_segment_with_rate_control(self, segment_path): + """处理单个TS分片,加入实时帧率控制""" + try: + container = av.open(segment_path) + video_stream = container.streams.video[0] + + # 如果是第一个分片,需要设置base_pts + if self.base_pts is None: + # 读取第一帧来获取PTS + for packet in container.demux(video_stream): + for frame in packet.decode(): + if frame.pts is not None: + self.base_pts = frame.pts * video_stream.time_base * 1000 + break + break + # 重置容器到开始 + container.close() + container = av.open(segment_path) + video_stream = container.streams.video[0] + + # 处理每一帧 + for packet in container.demux(video_stream): + for frame in packet.decode(): + if frame.pts is not None and not self.stop_event.is_set(): + # 计算PTS时间(毫秒) + pts_ms = frame.pts * video_stream.time_base * 1000 + + # 转换为numpy数组 + frame_np = frame.to_ndarray(format='bgr24') + + # 实时处理帧,加入帧率控制 + self.process_frame_with_rate_control(frame_np, pts_ms) + + container.close() + + except Exception as e: + logger.error(f"[ERROR] Failed to process TS file {segment_path}: {e}") + + + # ========================= 服务主类 ========================= @@ -335,11 +274,9 @@ class HLSKadianService: for c in cfg.get("cameras", [])] self.stop_event = threading.Event() - self.frame_queue = queue.Queue(maxsize=2000) # 帧队列,容量较大 self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小 self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列 - self.ts_reader_workers = [] self.frame_processor_workers = [] self.biz_processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event) self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT) @@ -351,28 +288,20 @@ class HLSKadianService: self.ws_sender.start() self.biz_processor.start() - # 为每个摄像头启动TS读取线程和帧处理线程 + # 为每个摄像头启动合并后的帧处理线程 for cam in self.cameras: - # 启动TS读取线程 - ts_reader = TSReaderWorker(cam, self.frame_queue, self.stop_event) - ts_reader.start() - self.ts_reader_workers.append(ts_reader) - - # 启动帧处理线程 - frame_processor = HLSFrameProcessor(cam, self.frame_queue, self.raw_queue, self.stop_event) + # 启动合并后的帧处理线程(包含TS读取和帧率控制) + frame_processor = HLSFrameProcessor(cam, self.raw_queue, self.stop_event) frame_processor.start() self.frame_processor_workers.append(frame_processor) - logger.info("[INFO] HLS Kadian Service started") + logger.info("[INFO] HLS Kadian Service started (merged TS reader and frame processor)") def stop(self): self.stop_event.set() - self.frame_queue.join() self.raw_queue.join() self.ws_queue.join() - for w in self.ts_reader_workers: - w.join(timeout=2.0) for w in self.frame_processor_workers: w.join(timeout=2.0)