diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py index 8df7dd3..c34ee4a 100644 --- a/hls_service_ws_kadian.py +++ b/hls_service_ws_kadian.py @@ -48,7 +48,7 @@ class TSReaderWorker(threading.Thread): self.base_pts = None # 第一个帧的PTS基准 # 队列阈值控制 - self.queue_threshold = 50 # 队列中帧数量阈值 + self.queue_threshold = 150 # 队列中帧数量阈值 def find_segment_files(self): """查找TS分片文件,返回排序后的文件名列表""" @@ -172,7 +172,14 @@ class TSReaderWorker(threading.Thread): if next_segment is not None: # 读取下一个分片 + # logger.info(f"[INFO] Starting to read TS segment: {next_segment}") + # start_time = time.time() + current_frames = self.read_ts_frames(next_segment) + + # elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 + # logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms") + if current_frames: self.current_segment_num += 1 frame_index = 0 @@ -183,8 +190,8 @@ class TSReaderWorker(threading.Thread): continue else: # 下一个分片不存在,等待 - time.sleep(0.5) logger.warning(f"[WARN] No next segment found, waiting...") + time.sleep(0.5) continue # 获取当前帧 @@ -243,6 +250,7 @@ class HLSFrameProcessor(threading.Thread): # 时间同步相关 self.start_time = None self.base_pts = None + self.last_process_time = None # 上次处理时间 def run(self): """主循环:从帧队列获取帧,按照PTS时间间隔放入raw_queue""" @@ -257,21 +265,28 @@ class HLSFrameProcessor(threading.Thread): self.start_time = time.time() self.base_pts = frame_data['pts_ms'] logger.info("[INFO] Frame processor initialized time base") - + + # # 打印距离上次处理过去的时间 + # current_time = time.time() + # if self.last_process_time is not None: + # time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒 + # logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms") + # self.last_process_time = current_time + # 计算预期的播放时间 expected_play_time = self.start_time + (frame_data['pts_ms'] - self.base_pts) / 1000.0 current_time = time.time() - + # 计算时间差 time_diff = current_time - expected_play_time # 时间同步策略 - if time_diff > 0.02: # 超过20ms,播放落后 + if time_diff > 0.06: # 超过60ms,播放落后 (3帧) # 跳过这一帧追赶 logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame") self.frame_queue.task_done() continue - elif time_diff < -0.02: # 超前20ms,需要等待 + elif time_diff < -0.06: # 超前60ms,需要等待 (3帧) # 等待到正确的时间点 wait_time = -time_diff if wait_time > 0.1: # 如果等待时间过长,重置时间基准 @@ -306,9 +321,10 @@ class HLSFrameProcessor(threading.Thread): # 标记帧处理完成 self.frame_queue.task_done() - + time.sleep(0.02) except queue.Empty: # 队列为空,继续等待 + time.sleep(0.02) continue except Exception as e: logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}") @@ -320,194 +336,6 @@ class HLSFrameProcessor(threading.Thread): time.sleep(3) # 重新启动线程 self.run() - - def find_segment_files(self): - """查找TS分片文件,返回排序后的文件名列表""" - pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts") - files = glob.glob(pattern) - # 按文件名中的数字排序 - files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0])) - return files - - def get_segment_number(self, filename): - """从文件名中提取序号""" - return int(filename.split('_')[-1].split('.')[0]) - - def find_nth_largest_segment(self, n): - """查找第n大的TS分片""" - files = self.find_segment_files() - if len(files) < n: - return None - # 返回第n大的文件(从大到小排序) - files_sorted = sorted(files, key=self.get_segment_number, reverse=True) - return files_sorted[n-1] if n-1 < len(files_sorted) else None - - def read_ts_frames(self, ts_file): - """读取TS文件中的所有帧,返回帧列表和PTS信息""" - frames_with_pts = [] - - try: - container = av.open(ts_file) - video_stream = container.streams.video[0] - - for packet in container.demux(video_stream): - for frame in packet.decode(): - if frame.pts is not None: - # 计算PTS时间(毫秒) - pts_ms = frame.pts * video_stream.time_base * 1000 - - # 转换为numpy数组 - frame_np = frame.to_ndarray(format='bgr24') - - frames_with_pts.append({ - 'frame': frame_np, - 'pts_ms': pts_ms, - 'timestamp': time.time() # 当前系统时间 - }) - - container.close() - return frames_with_pts - - except Exception as e: - logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}") - return [] - - def initialize_playback(self): - """初始化播放:找到第3大的TS分片,建立时间基准""" - while not self.stop_event.is_set(): - # 查找第3大的TS分片 - third_largest = self.find_nth_largest_segment(3) - - if third_largest is not None: - segment_num = self.get_segment_number(third_largest) - logger.info(f"[INFO] Found segment {segment_num}, starting playback") - - # 读取该分片的所有帧 - frames = self.read_ts_frames(third_largest) - if frames: - # 设置基准时间 - self.current_segment_num = segment_num - self.start_time = time.time() - self.base_pts = frames[0]['pts_ms'] - return frames - else: - logger.warning(f"[WARN] No frames found in segment {segment_num}") - - # 等待0.5秒后重试 - time.sleep(0.5) - logger.warning(f"[WARN] No frames found, waiting...") - - return [] - - def find_next_segment(self): - """查找下一个TS分片""" - next_segment_num = self.current_segment_num + 1 - next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts") - - # 检查+1, +2, +3是否存在 - check_segments = [ - next_segment_num + 0, # +1 - next_segment_num + 1, # +2 - next_segment_num + 2 # +3 - ] - - for seg_num in check_segments: - seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts") - if not os.path.exists(seg_path): - return None - - # 所有检查的分片都存在,返回下一个分片 - return next_segment_path - - def run(self): - """主循环:模拟实时播放HLS流""" - # 初始化播放 - current_frames = self.initialize_playback() - if not current_frames: - logger.error("[ERROR] Failed to initialize playback") - return - - frame_index = 0 - - while not self.stop_event.is_set(): - if frame_index >= len(current_frames): - # 当前分片播放完毕,查找下一个分片 - next_segment = self.find_next_segment() - - if next_segment is not None: - # 读取下一个分片 - current_frames = self.read_ts_frames(next_segment) - if current_frames: - self.current_segment_num += 1 - frame_index = 0 - logger.info(f"[INFO] Switching to segment {self.current_segment_num}") - else: - logger.warning(f"[WARN] Failed to read next segment, waiting...") - time.sleep(0.5) - continue - else: - # 下一个分片不存在,等待 - time.sleep(0.5) - logger.warning(f"[WARN] No next segment found, waiting...") - continue - - # 获取当前帧 - frame_data = current_frames[frame_index] - frame = frame_data['frame'] - pts_ms = frame_data['pts_ms'] - - # 计算预期的播放时间 - expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0 - current_time = time.time() - - # 计算时间差 - time_diff = current_time - expected_play_time - - # 时间同步策略 - if time_diff > 0.02: # 超过20ms,播放落后 - # 跳过一些帧追赶 - skip_count = min(int(time_diff * RTSP_TARGET_FPS), len(current_frames) - frame_index - 1) - if skip_count > 0: - logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping {skip_count} frames") - frame_index += skip_count - continue - elif time_diff < -0.02: # 超前20ms,需要等待 - # 等待到正确的时间点 - wait_time = -time_diff - if wait_time > 0.1: # 如果等待时间过长,重置时间基准 - logger.info(f"[WARN] Too far ahead, resetting time base") - self.start_time = current_time - self.base_pts = pts_ms - else: - time.sleep(wait_time) - logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms") - continue - - # 时间同步良好,放入队列 - item = { - "camera_id": self.camera_cfg.id, - "camera_name": self.camera_cfg.name, - "timestamp": current_time, - "frame": frame, - } - - try: - # 队列满时处理 - if self.raw_queue.full(): - try: - self.raw_queue.get_nowait() - self.raw_queue.task_done() - except queue.Empty: - pass - - self.raw_queue.put(item, timeout=0.5) - frame_index += 1 - - except queue.Full: - logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") - - # 控制处理速度 - time.sleep(0.02) # 最小间隔 # ========================= 服务主类 ========================= @@ -519,7 +347,7 @@ class HLSKadianService: for c in cfg.get("cameras", [])] self.stop_event = threading.Event() - self.frame_queue = queue.Queue(maxsize=50) # 帧队列,容量较大 + self.frame_queue = queue.Queue(maxsize=2000) # 帧队列,容量较大 self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小 self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列