移除不必要的数据

This commit is contained in:
zqc
2026-02-04 16:42:05 +08:00
parent 4625f3dfb8
commit d9d681181e

View File

@@ -156,8 +156,7 @@ class TSReaderWorker(threading.Thread):
if not current_frames: if not current_frames:
logger.error("[ERROR] Failed to initialize playback") logger.error("[ERROR] Failed to initialize playback")
return return
frame_index = 0
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
@@ -165,66 +164,54 @@ class TSReaderWorker(threading.Thread):
if self.frame_queue.qsize() >= self.queue_threshold: if self.frame_queue.qsize() >= self.queue_threshold:
time.sleep(0.2) time.sleep(0.2)
continue continue
if frame_index >= len(current_frames):
# 当前分片播放完毕,查找下一个分片
next_segment = self.find_next_segment()
if next_segment is not None:
# 读取下一个分片
# logger.info(f"[INFO] Starting to read TS segment: {next_segment}")
# start_time = time.time()
current_frames = self.read_ts_frames(next_segment)
# elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒
# logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms")
if current_frames: # 当前分片播放完毕,查找下一个分片
self.current_segment_num += 1 next_segment = self.find_next_segment()
frame_index = 0
logger.info(f"[INFO] Switching to segment {self.current_segment_num}") if next_segment is not None:
else: # 读取下一个分片
logger.warning(f"[WARN] Failed to read next segment, waiting...") # logger.info(f"[INFO] Starting to read TS segment: {next_segment}")
time.sleep(0.5) # start_time = time.time()
continue
current_frames = self.read_ts_frames(next_segment)
# elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒
# logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms")
if current_frames:
self.current_segment_num += 1
frame_index = 0
logger.info(f"[INFO] Switching to segment {self.current_segment_num}")
else: else:
# 下一个分片不存在,等待 logger.warning(f"[WARN] Failed to read next segment, waiting...")
logger.warning(f"[WARN] No next segment found, waiting...")
time.sleep(0.5) time.sleep(0.5)
continue continue
else:
# 下一个分片不存在,等待
logger.warning(f"[WARN] No next segment found, waiting...")
time.sleep(0.5)
continue
# 获取当前帧 # 一次性将所有帧放入队列
frame_data = current_frames[frame_index] for frame_data in current_frames:
frame = frame_data['frame'] item = {
pts_ms = frame_data['pts_ms'] "frame": frame_data['frame'],
"pts_ms": frame_data['pts_ms']
# 计算预期的播放时间 }
expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0
# 准备帧数据
item = {
"camera_id": self.camera_cfg.id,
"camera_name": self.camera_cfg.name,
"timestamp": expected_play_time, # 使用预期的播放时间
"frame": frame,
"pts_ms": pts_ms
}
try:
# 队列满时处理
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
self.frame_queue.task_done()
except queue.Empty:
pass
self.frame_queue.put(item, timeout=0.5) try:
frame_index += 1 # 队列满时处理
if self.frame_queue.full():
except queue.Full: try:
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") self.frame_queue.get_nowait()
except queue.Empty:
pass
self.frame_queue.put(item, timeout=0.5)
except queue.Full:
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}")
break # 如果队列满,停止放入更多帧
except Exception as e: except Exception as e:
logger.error(f"[ERROR] TSReaderWorker loop error: {e}") logger.error(f"[ERROR] TSReaderWorker loop error: {e}")
@@ -299,8 +286,8 @@ class HLSFrameProcessor(threading.Thread):
# 时间同步良好放入raw_queue # 时间同步良好放入raw_queue
item = { item = {
"camera_id": frame_data["camera_id"], "camera_id": self.camera_cfg.id,
"camera_name": frame_data["camera_name"], "camera_name": self.camera_cfg.name,
"timestamp": current_time, "timestamp": current_time,
"frame": frame_data["frame"], "frame": frame_data["frame"],
} }
@@ -317,7 +304,7 @@ class HLSFrameProcessor(threading.Thread):
self.raw_queue.put(item, timeout=0.5) self.raw_queue.put(item, timeout=0.5)
except queue.Full: except queue.Full:
logger.warning(f"[WARN] Raw queue full, dropping frame from {frame_data['camera_name']}") logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}")
# 标记帧处理完成 # 标记帧处理完成
self.frame_queue.task_done() self.frame_queue.task_done()