Files
SupervisorAI/hls_service_ws_kadian.py

342 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hls_service_kadian.py
# 基于HLS TS分片的卡点检测服务
# 支持从本地TS分片读取帧按照PTS时间间隔模拟实时流
import av
import os
import time
import threading
import queue
import yaml
import glob
from dataclasses import dataclass
from biz.checkpoint.checkpoint_biz import KadianDetector, RTSP_TARGET_FPS, ALERT_PUSH_INTERVAL, FrameProcessorWorker
from utils.web_socket_sender import WebSocketSender
from utils.logger import get_logger
logger = get_logger(__name__)
WS_HOST = "0.0.0.0"
WS_PORT = 8765
# HLS配置
HLS_SEGMENT_DIR = "D:\\ProjectDoc\\Police\\data\\hls" # TS分片存储目录
HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式
# ========================= 数据结构 =========================
@dataclass
class CameraConfig:
id: int
name: str
index: str
# ========================= 合并的TS读取和帧处理线程 =========================
class HLSFrameProcessor(threading.Thread):
def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event):
super().__init__(daemon=True)
self.camera_cfg = camera_cfg
self.raw_queue = raw_queue
self.stop_event = stop_event
# HLS状态变量
self.current_segment_num = -1 # 当前处理的TS分片序号
self.start_time = None # 播放开始时间
self.base_pts = None # 第一个帧的PTS基准
# 时间同步相关
self.last_process_time = None # 上次处理时间
self.last_frame_pts = None # 上一帧的PTS时间
self.pts_diff = 0
self.should_reset_time = False
def find_segment_files(self):
"""查找TS分片文件返回排序后的文件名列表"""
pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts")
files = glob.glob(pattern)
# 按文件名中的数字排序
files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
return files
def get_segment_number(self, filename):
"""从文件名中提取序号"""
return int(filename.split('_')[-1].split('.')[0])
def find_nth_largest_segment(self, n):
"""查找第n大的TS分片"""
files = self.find_segment_files()
if len(files) < n:
return None
# 返回第n大的文件从大到小排序
files_sorted = sorted(files, key=self.get_segment_number, reverse=True)
return files_sorted[n-1] if n-1 < len(files_sorted) else None
def find_next_segment(self):
"""查找下一个TS分片"""
next_segment_num = self.current_segment_num + 1
next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts")
# 检查+1, +2, +3是否存在
check_segments = [
next_segment_num + 0, # +1
next_segment_num + 1, # +2
next_segment_num + 2 # +3
]
for seg_num in check_segments:
seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts")
if not os.path.exists(seg_path):
return None
# 所有检查的分片都存在,返回下一个分片
return next_segment_path
def initialize_playback(self):
"""初始化播放找到第3大的TS分片建立时间基准"""
while not self.stop_event.is_set():
# 查找第3大的TS分片
third_largest = self.find_nth_largest_segment(3)
if third_largest is not None:
segment_num = self.get_segment_number(third_largest)
logger.info(f"[INFO] Found segment {segment_num}, starting playback")
# 设置基准时间
self.current_segment_num = segment_num
self.start_time = time.time()
return third_largest
# 等待0.5秒后重试
time.sleep(0.5)
logger.warning(f"[WARN] No segments found, waiting...")
return None
def process_frame_with_rate_control(self, frame_data, frame_pts):
"""处理单帧,加入帧率控制逻辑"""
# 初始化时间基准
if self.base_pts is None:
self.base_pts = frame_pts
self.last_frame_pts = frame_pts
logger.info("[INFO] Frame processor initialized time base")
# 计算预期的播放时间
expected_play_time = self.start_time + (frame_pts - self.base_pts) / 1000.0
current_time = time.time()
# 计算时间差
wait_time = expected_play_time - current_time
# # 打印距离上次处理过去的时间
# current_time = time.time()
# if self.last_process_time is not None:
# time_since_last = (current_time - self.last_process_time) * 1000 # 转换为毫秒
# self.pts_diff = frame_pts - self.last_frame_pts # 与上一帧的PTS差
# # logger.info(f"[INFO] Time since last frame: {time_since_last:.1f}ms, pts: {frame_pts} ms, pts_diff: {self.pts_diff}ms")
# self.last_process_time = current_time
# self.last_frame_pts = frame_pts
if wait_time > 0:
if wait_time > 1: # 如果等待时间过长,重置时间基准
logger.info(f"[WARN] Too far ahead, resetting time base")
self.start_time = current_time
self.base_pts = frame_pts
else:
time.sleep(wait_time)
# logger.info(f"[DEBUG] Frame Waiting for {wait_time*1000:.1f}ms")
else :
if wait_time < -1:
logger.info(f"[WARN] Too far behind, resetting time base")
self.start_time = current_time
self.base_pts = frame_pts
# else:
# logger.info(f"[WARN] frame ahead, no waiting")
item = {
"camera_id": self.camera_cfg.id,
"camera_name": self.camera_cfg.name,
"timestamp": current_time,
"frame": frame_data,
}
try:
# raw_queue满时处理
if self.raw_queue.full():
try:
self.raw_queue.get_nowait()
self.raw_queue.task_done()
except queue.Empty:
pass
self.raw_queue.put(item, timeout=0.5)
# if self.pts_diff > 0:
# time.sleep(self.pts_diff/1000)
return True
except queue.Full:
logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}")
return False
def run(self):
"""主循环读取TS文件并实时处理帧加入帧率控制"""
try:
# 初始化播放
initial_segment = self.initialize_playback()
if not initial_segment:
logger.error("[ERROR] Failed to initialize playback")
return
# 处理初始分片
self.process_segment_with_rate_control(initial_segment)
# 连续检测失败计数器
consecutive_failures = 0
while not self.stop_event.is_set():
try:
# 查找下一个分片
next_segment = self.find_next_segment()
if next_segment is not None:
# 重置连续失败计数器
consecutive_failures = 0
# 处理下一个分片
logger.info(f"[INFO] Starting to process TS segment: {next_segment}")
self.current_segment_num += 1
self.process_segment_with_rate_control(next_segment)
logger.info(f"[INFO] Finished processing segment {self.current_segment_num}")
else:
# 下一个分片不存在,根据连续失败次数调整等待时间
consecutive_failures += 1
self.should_reset_time = True
if consecutive_failures <= 10:
sleep_time = 0.02 # 前10次失败等待0.02秒
elif consecutive_failures <= 20:
sleep_time = 0.05 # 继续5次失败等待0.05秒
else:
sleep_time = 0.5 # 超过15次失败等待0.5秒
logger.warning(f"[WARN] No next segment found (failures: {consecutive_failures}), waiting {sleep_time}s...")
time.sleep(sleep_time)
except Exception as e:
logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}")
time.sleep(1) # 出错后等待1秒再继续
except Exception as e:
logger.error(f"[ERROR] HLSFrameProcessor main error: {e}")
logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...")
time.sleep(3)
# 重新启动线程
self.run()
def process_segment_with_rate_control(self, segment_path):
"""处理单个TS分片加入实时帧率控制"""
try:
container = av.open(segment_path)
video_stream = container.streams.video[0]
# 重置时间轴使用新分片的第一帧PTS重新设置时间基准
current_time = time.time()
if self.base_pts is None or self.should_reset_time:
# 读取第一帧来获取PTS
for packet in container.demux(video_stream):
for frame in packet.decode():
if frame.pts is not None:
# 重置时间基准
self.start_time = current_time
self.base_pts = float(frame.pts * video_stream.time_base) * 1000
self.should_reset_time = False
logger.info(f"[INFO] Time axis reset: start_time={current_time}, base_pts={self.base_pts:.1f}")
break
break
# 重置容器到开始
container.close()
container = av.open(segment_path)
video_stream = container.streams.video[0]
# 处理每一帧
for packet in container.demux(video_stream):
for frame in packet.decode():
if frame.pts is not None and not self.stop_event.is_set():
# 计算PTS时间毫秒
pts_ms = frame.pts * video_stream.time_base * 1000
# 转换为numpy数组
frame_np = frame.to_ndarray(format='bgr24')
# 实时处理帧,加入帧率控制
self.process_frame_with_rate_control(frame_np, pts_ms)
container.close()
except Exception as e:
logger.error(f"[ERROR] Failed to process TS file {segment_path}: {e}")
# ========================= 服务主类 =========================
class HLSKadianService:
def __init__(self, config_path: str = "config.yaml"):
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index=c["index"])
for c in cfg.get("cameras", [])]
self.stop_event = threading.Event()
self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小
self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列
self.frame_processor_workers = []
self.biz_processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event)
self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT)
def start(self):
# 确保TS分片目录存在
os.makedirs(HLS_SEGMENT_DIR, exist_ok=True)
self.ws_sender.start()
self.biz_processor.start()
# 为每个摄像头启动合并后的帧处理线程
for cam in self.cameras:
# 启动合并后的帧处理线程包含TS读取和帧率控制
frame_processor = HLSFrameProcessor(cam, self.raw_queue, self.stop_event)
frame_processor.start()
self.frame_processor_workers.append(frame_processor)
logger.info("[INFO] HLS Kadian Service started (merged TS reader and frame processor)")
def stop(self):
self.stop_event.set()
self.raw_queue.join()
self.ws_queue.join()
for w in self.frame_processor_workers:
w.join(timeout=2.0)
self.biz_processor.join(timeout=2.0)
self.ws_sender.join(timeout=2.0)
logger.info("[INFO] Service stopped")
if __name__ == "__main__":
service = HLSKadianService("config.yaml")
service.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
service.stop()