From ab9b13dcb128cfebe2d9ff2dc774f93494a14bd2 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Wed, 4 Feb 2026 15:24:52 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=BB=E5=8F=96ts=E7=BA=BF=E7=A8=8B=E5=92=8C?= =?UTF-8?q?=E5=B8=A7=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B=E5=88=86=E5=BC=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hls_service_ws_kadian.py | 318 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 304 insertions(+), 14 deletions(-) diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py index 389189c..8df7dd3 100644 --- a/hls_service_ws_kadian.py +++ b/hls_service_ws_kadian.py @@ -34,12 +34,12 @@ class CameraConfig: index: str -# ========================= HLS 抓帧线程 ========================= -class HLSCaptureWorker(threading.Thread): - def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event): +# ========================= TS 文件读取线程 ========================= +class TSReaderWorker(threading.Thread): + def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg - self.raw_queue = raw_queue + self.frame_queue = frame_queue self.stop_event = stop_event # HLS状态变量 @@ -47,6 +47,280 @@ class HLSCaptureWorker(threading.Thread): self.start_time = None # 播放开始时间 self.base_pts = None # 第一个帧的PTS基准 + # 队列阈值控制 + self.queue_threshold = 50 # 队列中帧数量阈值 + + def find_segment_files(self): + """查找TS分片文件,返回排序后的文件名列表""" + pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts") + files = glob.glob(pattern) + # 按文件名中的数字排序 + files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0])) + return files + + def get_segment_number(self, filename): + """从文件名中提取序号""" + return int(filename.split('_')[-1].split('.')[0]) + + def find_nth_largest_segment(self, n): + """查找第n大的TS分片""" + files = self.find_segment_files() + if len(files) < n: + return None + # 返回第n大的文件(从大到小排序) + files_sorted = sorted(files, key=self.get_segment_number, reverse=True) + return files_sorted[n-1] if n-1 < len(files_sorted) else None + + def read_ts_frames(self, ts_file): + """读取TS文件中的所有帧,返回帧列表和PTS信息""" + frames_with_pts = [] + + try: + container = av.open(ts_file) + video_stream = container.streams.video[0] + + for packet in container.demux(video_stream): + for frame in packet.decode(): + if frame.pts is not None: + # 计算PTS时间(毫秒) + pts_ms = frame.pts * video_stream.time_base * 1000 + + # 转换为numpy数组 + frame_np = frame.to_ndarray(format='bgr24') + + frames_with_pts.append({ + 'frame': frame_np, + 'pts_ms': pts_ms, + 'timestamp': time.time() # 当前系统时间 + }) + + container.close() + return frames_with_pts + + except Exception as e: + logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}") + return [] + + def initialize_playback(self): + """初始化播放:找到第3大的TS分片,建立时间基准""" + while not self.stop_event.is_set(): + # 查找第3大的TS分片 + third_largest = self.find_nth_largest_segment(3) + + if third_largest is not None: + segment_num = self.get_segment_number(third_largest) + logger.info(f"[INFO] Found segment {segment_num}, starting playback") + + # 读取该分片的所有帧 + frames = self.read_ts_frames(third_largest) + if frames: + # 设置基准时间 + self.current_segment_num = segment_num + self.start_time = time.time() + self.base_pts = frames[0]['pts_ms'] + return frames + else: + logger.warning(f"[WARN] No frames found in segment {segment_num}") + + # 等待0.5秒后重试 + time.sleep(0.5) + logger.warning(f"[WARN] No frames found, waiting...") + + return [] + + def find_next_segment(self): + """查找下一个TS分片""" + next_segment_num = self.current_segment_num + 1 + next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts") + + # 检查+1, +2, +3是否存在 + check_segments = [ + next_segment_num + 0, # +1 + next_segment_num + 1, # +2 + next_segment_num + 2 # +3 + ] + + for seg_num in check_segments: + seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts") + if not os.path.exists(seg_path): + return None + + # 所有检查的分片都存在,返回下一个分片 + return next_segment_path + + def run(self): + """主循环:读取TS文件并将帧放入队列""" + try: + # 初始化播放 + current_frames = self.initialize_playback() + if not current_frames: + logger.error("[ERROR] Failed to initialize playback") + return + + frame_index = 0 + + while not self.stop_event.is_set(): + try: + # 检查队列大小,如果超过阈值则等待 + if self.frame_queue.qsize() >= self.queue_threshold: + time.sleep(0.2) + continue + + if frame_index >= len(current_frames): + # 当前分片播放完毕,查找下一个分片 + next_segment = self.find_next_segment() + + if next_segment is not None: + # 读取下一个分片 + current_frames = self.read_ts_frames(next_segment) + if current_frames: + self.current_segment_num += 1 + frame_index = 0 + logger.info(f"[INFO] Switching to segment {self.current_segment_num}") + else: + logger.warning(f"[WARN] Failed to read next segment, waiting...") + time.sleep(0.5) + continue + else: + # 下一个分片不存在,等待 + time.sleep(0.5) + logger.warning(f"[WARN] No next segment found, waiting...") + continue + + # 获取当前帧 + frame_data = current_frames[frame_index] + frame = frame_data['frame'] + pts_ms = frame_data['pts_ms'] + + # 计算预期的播放时间 + expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0 + + # 准备帧数据 + item = { + "camera_id": self.camera_cfg.id, + "camera_name": self.camera_cfg.name, + "timestamp": expected_play_time, # 使用预期的播放时间 + "frame": frame, + "pts_ms": pts_ms + } + + try: + # 队列满时处理 + if self.frame_queue.full(): + try: + self.frame_queue.get_nowait() + self.frame_queue.task_done() + except queue.Empty: + pass + + self.frame_queue.put(item, timeout=0.5) + frame_index += 1 + + except queue.Full: + logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") + + except Exception as e: + logger.error(f"[ERROR] TSReaderWorker loop error: {e}") + time.sleep(1) # 出错后等待1秒再继续 + + except Exception as e: + logger.error(f"[ERROR] TSReaderWorker main error: {e}") + logger.info("[INFO] TSReaderWorker will restart in 3 seconds...") + time.sleep(3) + # 重新启动线程 + self.run() + + +# ========================= 帧处理线程 ========================= +class HLSFrameProcessor(threading.Thread): + def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, raw_queue: queue.Queue, stop_event: threading.Event): + super().__init__(daemon=True) + self.camera_cfg = camera_cfg + self.frame_queue = frame_queue + self.raw_queue = raw_queue + self.stop_event = stop_event + + # 时间同步相关 + self.start_time = None + self.base_pts = None + + def run(self): + """主循环:从帧队列获取帧,按照PTS时间间隔放入raw_queue""" + try: + while not self.stop_event.is_set(): + try: + # 从帧队列获取帧数据(阻塞等待) + frame_data = self.frame_queue.get(timeout=1.0) + + # 初始化时间基准 + if self.start_time is None: + self.start_time = time.time() + self.base_pts = frame_data['pts_ms'] + logger.info("[INFO] Frame processor initialized time base") + + # 计算预期的播放时间 + expected_play_time = self.start_time + (frame_data['pts_ms'] - self.base_pts) / 1000.0 + current_time = time.time() + + # 计算时间差 + time_diff = current_time - expected_play_time + + # 时间同步策略 + if time_diff > 0.02: # 超过20ms,播放落后 + # 跳过这一帧追赶 + logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame") + self.frame_queue.task_done() + continue + elif time_diff < -0.02: # 超前20ms,需要等待 + # 等待到正确的时间点 + wait_time = -time_diff + if wait_time > 0.1: # 如果等待时间过长,重置时间基准 + logger.info(f"[WARN] Too far ahead, resetting time base") + self.start_time = current_time + self.base_pts = frame_data['pts_ms'] + else: + time.sleep(wait_time) + logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms") + + # 时间同步良好,放入raw_queue + item = { + "camera_id": frame_data["camera_id"], + "camera_name": frame_data["camera_name"], + "timestamp": current_time, + "frame": frame_data["frame"], + } + + try: + # raw_queue满时处理 + if self.raw_queue.full(): + try: + self.raw_queue.get_nowait() + self.raw_queue.task_done() + except queue.Empty: + pass + + self.raw_queue.put(item, timeout=0.5) + + except queue.Full: + logger.warning(f"[WARN] Raw queue full, dropping frame from {frame_data['camera_name']}") + + # 标记帧处理完成 + self.frame_queue.task_done() + + except queue.Empty: + # 队列为空,继续等待 + continue + except Exception as e: + logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}") + time.sleep(1) # 出错后等待1秒再继续 + + except Exception as e: + logger.error(f"[ERROR] HLSFrameProcessor main error: {e}") + logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...") + time.sleep(3) + # 重新启动线程 + self.run() + def find_segment_files(self): """查找TS分片文件,返回排序后的文件名列表""" pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts") @@ -245,11 +519,13 @@ class HLSKadianService: for c in cfg.get("cameras", [])] self.stop_event = threading.Event() - self.raw_queue = queue.Queue(maxsize=3) - self.ws_queue = queue.Queue(maxsize=1000) + self.frame_queue = queue.Queue(maxsize=50) # 帧队列,容量较大 + self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小 + self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列 - self.capture_workers = [] - self.processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event) + self.ts_reader_workers = [] + self.frame_processor_workers = [] + self.biz_processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event) self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT) def start(self): @@ -257,20 +533,34 @@ class HLSKadianService: os.makedirs(HLS_SEGMENT_DIR, exist_ok=True) self.ws_sender.start() - self.processor.start() + self.biz_processor.start() + + # 为每个摄像头启动TS读取线程和帧处理线程 for cam in self.cameras: - w = HLSCaptureWorker(cam, self.raw_queue, self.stop_event) - w.start() - self.capture_workers.append(w) + # 启动TS读取线程 + ts_reader = TSReaderWorker(cam, self.frame_queue, self.stop_event) + ts_reader.start() + self.ts_reader_workers.append(ts_reader) + + # 启动帧处理线程 + frame_processor = HLSFrameProcessor(cam, self.frame_queue, self.raw_queue, self.stop_event) + frame_processor.start() + self.frame_processor_workers.append(frame_processor) + logger.info("[INFO] HLS Kadian Service started") def stop(self): self.stop_event.set() + self.frame_queue.join() self.raw_queue.join() self.ws_queue.join() - for w in self.capture_workers: + + for w in self.ts_reader_workers: w.join(timeout=2.0) - self.processor.join(timeout=2.0) + for w in self.frame_processor_workers: + w.join(timeout=2.0) + + self.biz_processor.join(timeout=2.0) self.ws_sender.join(timeout=2.0) logger.info("[INFO] Service stopped")