删除无用代码,增加日志打印,修改部分参数

This commit is contained in:
zqc
2026-02-04 16:27:16 +08:00
parent ab9b13dcb1
commit 4625f3dfb8

View File

@@ -48,7 +48,7 @@ class TSReaderWorker(threading.Thread):
self.base_pts = None # 第一个帧的PTS基准
# 队列阈值控制
self.queue_threshold = 50 # 队列中帧数量阈值
self.queue_threshold = 150 # 队列中帧数量阈值
def find_segment_files(self):
"""查找TS分片文件返回排序后的文件名列表"""
@@ -172,7 +172,14 @@ class TSReaderWorker(threading.Thread):
if next_segment is not None:
# 读取下一个分片
# logger.info(f"[INFO] Starting to read TS segment: {next_segment}")
# start_time = time.time()
current_frames = self.read_ts_frames(next_segment)
# elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒
# logger.info(f"[INFO] Finished reading TS segment, took {elapsed_time:.1f}ms")
if current_frames:
self.current_segment_num += 1
frame_index = 0
@@ -183,8 +190,8 @@ class TSReaderWorker(threading.Thread):
continue
else:
# 下一个分片不存在,等待
time.sleep(0.5)
logger.warning(f"[WARN] No next segment found, waiting...")
time.sleep(0.5)
continue
# 获取当前帧
@@ -243,6 +250,7 @@ class HLSFrameProcessor(threading.Thread):
# 时间同步相关
self.start_time = None
self.base_pts = None
self.last_process_time = None # 上次处理时间
def run(self):
"""主循环从帧队列获取帧按照PTS时间间隔放入raw_queue"""
@@ -258,6 +266,13 @@ class HLSFrameProcessor(threading.Thread):
self.base_pts = frame_data['pts_ms']
logger.info("[INFO] Frame processor initialized time base")
# # 打印距离上次处理过去的时间
# current_time = time.time()
# if self.last_process_time is not None:
# time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒
# logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms")
# self.last_process_time = current_time
# 计算预期的播放时间
expected_play_time = self.start_time + (frame_data['pts_ms'] - self.base_pts) / 1000.0
current_time = time.time()
@@ -266,12 +281,12 @@ class HLSFrameProcessor(threading.Thread):
time_diff = current_time - expected_play_time
# 时间同步策略
if time_diff > 0.02: # 超过20ms播放落后
if time_diff > 0.06: # 超过60ms播放落后 3帧
# 跳过这一帧追赶
logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame")
self.frame_queue.task_done()
continue
elif time_diff < -0.02: # 超前20ms需要等待
elif time_diff < -0.06: # 超前60ms需要等待 3帧
# 等待到正确的时间点
wait_time = -time_diff
if wait_time > 0.1: # 如果等待时间过长,重置时间基准
@@ -306,9 +321,10 @@ class HLSFrameProcessor(threading.Thread):
# 标记帧处理完成
self.frame_queue.task_done()
time.sleep(0.02)
except queue.Empty:
# 队列为空,继续等待
time.sleep(0.02)
continue
except Exception as e:
logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}")
@@ -321,194 +337,6 @@ class HLSFrameProcessor(threading.Thread):
# 重新启动线程
self.run()
def find_segment_files(self):
"""查找TS分片文件返回排序后的文件名列表"""
pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts")
files = glob.glob(pattern)
# 按文件名中的数字排序
files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
return files
def get_segment_number(self, filename):
"""从文件名中提取序号"""
return int(filename.split('_')[-1].split('.')[0])
def find_nth_largest_segment(self, n):
"""查找第n大的TS分片"""
files = self.find_segment_files()
if len(files) < n:
return None
# 返回第n大的文件从大到小排序
files_sorted = sorted(files, key=self.get_segment_number, reverse=True)
return files_sorted[n-1] if n-1 < len(files_sorted) else None
def read_ts_frames(self, ts_file):
"""读取TS文件中的所有帧返回帧列表和PTS信息"""
frames_with_pts = []
try:
container = av.open(ts_file)
video_stream = container.streams.video[0]
for packet in container.demux(video_stream):
for frame in packet.decode():
if frame.pts is not None:
# 计算PTS时间毫秒
pts_ms = frame.pts * video_stream.time_base * 1000
# 转换为numpy数组
frame_np = frame.to_ndarray(format='bgr24')
frames_with_pts.append({
'frame': frame_np,
'pts_ms': pts_ms,
'timestamp': time.time() # 当前系统时间
})
container.close()
return frames_with_pts
except Exception as e:
logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}")
return []
def initialize_playback(self):
"""初始化播放找到第3大的TS分片建立时间基准"""
while not self.stop_event.is_set():
# 查找第3大的TS分片
third_largest = self.find_nth_largest_segment(3)
if third_largest is not None:
segment_num = self.get_segment_number(third_largest)
logger.info(f"[INFO] Found segment {segment_num}, starting playback")
# 读取该分片的所有帧
frames = self.read_ts_frames(third_largest)
if frames:
# 设置基准时间
self.current_segment_num = segment_num
self.start_time = time.time()
self.base_pts = frames[0]['pts_ms']
return frames
else:
logger.warning(f"[WARN] No frames found in segment {segment_num}")
# 等待0.5秒后重试
time.sleep(0.5)
logger.warning(f"[WARN] No frames found, waiting...")
return []
def find_next_segment(self):
"""查找下一个TS分片"""
next_segment_num = self.current_segment_num + 1
next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts")
# 检查+1, +2, +3是否存在
check_segments = [
next_segment_num + 0, # +1
next_segment_num + 1, # +2
next_segment_num + 2 # +3
]
for seg_num in check_segments:
seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts")
if not os.path.exists(seg_path):
return None
# 所有检查的分片都存在,返回下一个分片
return next_segment_path
def run(self):
"""主循环模拟实时播放HLS流"""
# 初始化播放
current_frames = self.initialize_playback()
if not current_frames:
logger.error("[ERROR] Failed to initialize playback")
return
frame_index = 0
while not self.stop_event.is_set():
if frame_index >= len(current_frames):
# 当前分片播放完毕,查找下一个分片
next_segment = self.find_next_segment()
if next_segment is not None:
# 读取下一个分片
current_frames = self.read_ts_frames(next_segment)
if current_frames:
self.current_segment_num += 1
frame_index = 0
logger.info(f"[INFO] Switching to segment {self.current_segment_num}")
else:
logger.warning(f"[WARN] Failed to read next segment, waiting...")
time.sleep(0.5)
continue
else:
# 下一个分片不存在,等待
time.sleep(0.5)
logger.warning(f"[WARN] No next segment found, waiting...")
continue
# 获取当前帧
frame_data = current_frames[frame_index]
frame = frame_data['frame']
pts_ms = frame_data['pts_ms']
# 计算预期的播放时间
expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0
current_time = time.time()
# 计算时间差
time_diff = current_time - expected_play_time
# 时间同步策略
if time_diff > 0.02: # 超过20ms播放落后
# 跳过一些帧追赶
skip_count = min(int(time_diff * RTSP_TARGET_FPS), len(current_frames) - frame_index - 1)
if skip_count > 0:
logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping {skip_count} frames")
frame_index += skip_count
continue
elif time_diff < -0.02: # 超前20ms需要等待
# 等待到正确的时间点
wait_time = -time_diff
if wait_time > 0.1: # 如果等待时间过长,重置时间基准
logger.info(f"[WARN] Too far ahead, resetting time base")
self.start_time = current_time
self.base_pts = pts_ms
else:
time.sleep(wait_time)
logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms")
continue
# 时间同步良好,放入队列
item = {
"camera_id": self.camera_cfg.id,
"camera_name": self.camera_cfg.name,
"timestamp": current_time,
"frame": frame,
}
try:
# 队列满时处理
if self.raw_queue.full():
try:
self.raw_queue.get_nowait()
self.raw_queue.task_done()
except queue.Empty:
pass
self.raw_queue.put(item, timeout=0.5)
frame_index += 1
except queue.Full:
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}")
# 控制处理速度
time.sleep(0.02) # 最小间隔
# ========================= 服务主类 =========================
class HLSKadianService:
@@ -519,7 +347,7 @@ class HLSKadianService:
for c in cfg.get("cameras", [])]
self.stop_event = threading.Event()
self.frame_queue = queue.Queue(maxsize=50) # 帧队列,容量较大
self.frame_queue = queue.Queue(maxsize=2000) # 帧队列,容量较大
self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小
self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列