diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py index c34ee4a..55e4bb3 100644 --- a/hls_service_ws_kadian.py +++ b/hls_service_ws_kadian.py @@ -156,8 +156,7 @@ class TSReaderWorker(threading.Thread): if not current_frames: logger.error("[ERROR] Failed to initialize playback") return - - frame_index = 0 + while not self.stop_event.is_set(): try: @@ -165,66 +164,54 @@ class TSReaderWorker(threading.Thread): if self.frame_queue.qsize() >= self.queue_threshold: time.sleep(0.2) continue - - if frame_index >= len(current_frames): - # 当前分片播放完毕,查找下一个分片 - next_segment = self.find_next_segment() - - if next_segment is not None: - # 读取下一个分片 - # logger.info(f"[INFO] Starting to read TS segment: {next_segment}") - # start_time = time.time() - - current_frames = self.read_ts_frames(next_segment) - - # elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 - # logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms") - if current_frames: - self.current_segment_num += 1 - frame_index = 0 - logger.info(f"[INFO] Switching to segment {self.current_segment_num}") - else: - logger.warning(f"[WARN] Failed to read next segment, waiting...") - time.sleep(0.5) - continue + # 当前分片播放完毕,查找下一个分片 + next_segment = self.find_next_segment() + + if next_segment is not None: + # 读取下一个分片 + # logger.info(f"[INFO] Starting to read TS segment: {next_segment}") + # start_time = time.time() + + current_frames = self.read_ts_frames(next_segment) + + # elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 + # logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms") + + if current_frames: + self.current_segment_num += 1 + frame_index = 0 + logger.info(f"[INFO] Switching to segment {self.current_segment_num}") else: - # 下一个分片不存在,等待 - logger.warning(f"[WARN] No next segment found, waiting...") + logger.warning(f"[WARN] Failed to read next segment, waiting...") time.sleep(0.5) continue + else: + # 下一个分片不存在,等待 + logger.warning(f"[WARN] No next segment found, waiting...") + time.sleep(0.5) + continue - # 获取当前帧 - frame_data = current_frames[frame_index] - frame = frame_data['frame'] - pts_ms = frame_data['pts_ms'] - - # 计算预期的播放时间 - expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0 - - # 准备帧数据 - item = { - "camera_id": self.camera_cfg.id, - "camera_name": self.camera_cfg.name, - "timestamp": expected_play_time, # 使用预期的播放时间 - "frame": frame, - "pts_ms": pts_ms - } - - try: - # 队列满时处理 - if self.frame_queue.full(): - try: - self.frame_queue.get_nowait() - self.frame_queue.task_done() - except queue.Empty: - pass + # 一次性将所有帧放入队列 + for frame_data in current_frames: + item = { + "frame": frame_data['frame'], + "pts_ms": frame_data['pts_ms'] + } - self.frame_queue.put(item, timeout=0.5) - frame_index += 1 - - except queue.Full: - logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") + try: + # 队列满时处理 + if self.frame_queue.full(): + try: + self.frame_queue.get_nowait() + except queue.Empty: + pass + + self.frame_queue.put(item, timeout=0.5) + + except queue.Full: + logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") + break # 如果队列满,停止放入更多帧 except Exception as e: logger.error(f"[ERROR] TSReaderWorker loop error: {e}") @@ -299,8 +286,8 @@ class HLSFrameProcessor(threading.Thread): # 时间同步良好,放入raw_queue item = { - "camera_id": frame_data["camera_id"], - "camera_name": frame_data["camera_name"], + "camera_id": self.camera_cfg.id, + "camera_name": self.camera_cfg.name, "timestamp": current_time, "frame": frame_data["frame"], } @@ -317,7 +304,7 @@ class HLSFrameProcessor(threading.Thread): self.raw_queue.put(item, timeout=0.5) except queue.Full: - logger.warning(f"[WARN] Raw queue full, dropping frame from {frame_data['camera_name']}") + logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}") # 标记帧处理完成 self.frame_queue.task_done()