From 161d1140ba6a8d1f1698e9b1175722103785ff5e Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Wed, 4 Feb 2026 15:03:48 +0800 Subject: [PATCH] =?UTF-8?q?hls=E8=A7=A3=E6=9E=90=E9=9B=8F=E5=BD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hls_service_ws_kadian.py | 285 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 hls_service_ws_kadian.py diff --git a/hls_service_ws_kadian.py b/hls_service_ws_kadian.py new file mode 100644 index 0000000..389189c --- /dev/null +++ b/hls_service_ws_kadian.py @@ -0,0 +1,285 @@ +# hls_service_kadian.py +# 基于HLS TS分片的卡点检测服务 +# 支持从本地TS分片读取帧,按照PTS时间间隔模拟实时流 + +import av +import os +import time +import threading +import queue +import yaml +import glob + +from dataclasses import dataclass + +from biz.checkpoint.checkpoint_biz import KadianDetector, RTSP_TARGET_FPS, ALERT_PUSH_INTERVAL, FrameProcessorWorker +from utils.web_socket_sender import WebSocketSender +from utils.logger import get_logger + +logger = get_logger(__name__) + +WS_HOST = "0.0.0.0" +WS_PORT = 8765 + +# HLS配置 +HLS_SEGMENT_DIR = "D:\\ProjectDoc\\Police\\data\\hls" # TS分片存储目录 +HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式 + + +# ========================= 数据结构 ========================= +@dataclass +class CameraConfig: + id: int + name: str + index: str + + +# ========================= HLS 抓帧线程 ========================= +class HLSCaptureWorker(threading.Thread): + def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event): + super().__init__(daemon=True) + self.camera_cfg = camera_cfg + self.raw_queue = raw_queue + self.stop_event = stop_event + + # HLS状态变量 + self.current_segment_num = -1 # 当前处理的TS分片序号 + self.start_time = None # 播放开始时间 + self.base_pts = None # 第一个帧的PTS基准 + + def find_segment_files(self): + """查找TS分片文件,返回排序后的文件名列表""" + pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts") + files = glob.glob(pattern) + # 按文件名中的数字排序 + files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0])) + return files + + def get_segment_number(self, filename): + """从文件名中提取序号""" + return int(filename.split('_')[-1].split('.')[0]) + + def find_nth_largest_segment(self, n): + """查找第n大的TS分片""" + files = self.find_segment_files() + if len(files) < n: + return None + # 返回第n大的文件(从大到小排序) + files_sorted = sorted(files, key=self.get_segment_number, reverse=True) + return files_sorted[n-1] if n-1 < len(files_sorted) else None + + def read_ts_frames(self, ts_file): + """读取TS文件中的所有帧,返回帧列表和PTS信息""" + frames_with_pts = [] + + try: + container = av.open(ts_file) + video_stream = container.streams.video[0] + + for packet in container.demux(video_stream): + for frame in packet.decode(): + if frame.pts is not None: + # 计算PTS时间(毫秒) + pts_ms = frame.pts * video_stream.time_base * 1000 + + # 转换为numpy数组 + frame_np = frame.to_ndarray(format='bgr24') + + frames_with_pts.append({ + 'frame': frame_np, + 'pts_ms': pts_ms, + 'timestamp': time.time() # 当前系统时间 + }) + + container.close() + return frames_with_pts + + except Exception as e: + logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}") + return [] + + def initialize_playback(self): + """初始化播放:找到第3大的TS分片,建立时间基准""" + while not self.stop_event.is_set(): + # 查找第3大的TS分片 + third_largest = self.find_nth_largest_segment(3) + + if third_largest is not None: + segment_num = self.get_segment_number(third_largest) + logger.info(f"[INFO] Found segment {segment_num}, starting playback") + + # 读取该分片的所有帧 + frames = self.read_ts_frames(third_largest) + if frames: + # 设置基准时间 + self.current_segment_num = segment_num + self.start_time = time.time() + self.base_pts = frames[0]['pts_ms'] + return frames + else: + logger.warning(f"[WARN] No frames found in segment {segment_num}") + + # 等待0.5秒后重试 + time.sleep(0.5) + logger.warning(f"[WARN] No frames found, waiting...") + + return [] + + def find_next_segment(self): + """查找下一个TS分片""" + next_segment_num = self.current_segment_num + 1 + next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts") + + # 检查+1, +2, +3是否存在 + check_segments = [ + next_segment_num + 0, # +1 + next_segment_num + 1, # +2 + next_segment_num + 2 # +3 + ] + + for seg_num in check_segments: + seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts") + if not os.path.exists(seg_path): + return None + + # 所有检查的分片都存在,返回下一个分片 + return next_segment_path + + def run(self): + """主循环:模拟实时播放HLS流""" + # 初始化播放 + current_frames = self.initialize_playback() + if not current_frames: + logger.error("[ERROR] Failed to initialize playback") + return + + frame_index = 0 + + while not self.stop_event.is_set(): + if frame_index >= len(current_frames): + # 当前分片播放完毕,查找下一个分片 + next_segment = self.find_next_segment() + + if next_segment is not None: + # 读取下一个分片 + current_frames = self.read_ts_frames(next_segment) + if current_frames: + self.current_segment_num += 1 + frame_index = 0 + logger.info(f"[INFO] Switching to segment {self.current_segment_num}") + else: + logger.warning(f"[WARN] Failed to read next segment, waiting...") + time.sleep(0.5) + continue + else: + # 下一个分片不存在,等待 + time.sleep(0.5) + logger.warning(f"[WARN] No next segment found, waiting...") + continue + + # 获取当前帧 + frame_data = current_frames[frame_index] + frame = frame_data['frame'] + pts_ms = frame_data['pts_ms'] + + # 计算预期的播放时间 + expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0 + current_time = time.time() + + # 计算时间差 + time_diff = current_time - expected_play_time + + # 时间同步策略 + if time_diff > 0.02: # 超过20ms,播放落后 + # 跳过一些帧追赶 + skip_count = min(int(time_diff * RTSP_TARGET_FPS), len(current_frames) - frame_index - 1) + if skip_count > 0: + logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping {skip_count} frames") + frame_index += skip_count + continue + elif time_diff < -0.02: # 超前20ms,需要等待 + # 等待到正确的时间点 + wait_time = -time_diff + if wait_time > 0.1: # 如果等待时间过长,重置时间基准 + logger.info(f"[WARN] Too far ahead, resetting time base") + self.start_time = current_time + self.base_pts = pts_ms + else: + time.sleep(wait_time) + logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms") + continue + + # 时间同步良好,放入队列 + item = { + "camera_id": self.camera_cfg.id, + "camera_name": self.camera_cfg.name, + "timestamp": current_time, + "frame": frame, + } + + try: + # 队列满时处理 + if self.raw_queue.full(): + try: + self.raw_queue.get_nowait() + self.raw_queue.task_done() + except queue.Empty: + pass + + self.raw_queue.put(item, timeout=0.5) + frame_index += 1 + + except queue.Full: + logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") + + # 控制处理速度 + time.sleep(0.02) # 最小间隔 + + +# ========================= 服务主类 ========================= +class HLSKadianService: + def __init__(self, config_path: str = "config.yaml"): + with open(config_path, "r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) + self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index=c["index"]) + for c in cfg.get("cameras", [])] + + self.stop_event = threading.Event() + self.raw_queue = queue.Queue(maxsize=3) + self.ws_queue = queue.Queue(maxsize=1000) + + self.capture_workers = [] + self.processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event) + self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT) + + def start(self): + # 确保TS分片目录存在 + os.makedirs(HLS_SEGMENT_DIR, exist_ok=True) + + self.ws_sender.start() + self.processor.start() + for cam in self.cameras: + w = HLSCaptureWorker(cam, self.raw_queue, self.stop_event) + w.start() + self.capture_workers.append(w) + logger.info("[INFO] HLS Kadian Service started") + + def stop(self): + self.stop_event.set() + self.raw_queue.join() + self.ws_queue.join() + for w in self.capture_workers: + w.join(timeout=2.0) + self.processor.join(timeout=2.0) + self.ws_sender.join(timeout=2.0) + logger.info("[INFO] Service stopped") + + +if __name__ == "__main__": + service = HLSKadianService("config.yaml") + service.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + service.stop() \ No newline at end of file