575 lines
23 KiB
Python
575 lines
23 KiB
Python
# hls_service_kadian.py
|
||
# 基于HLS TS分片的卡点检测服务
|
||
# 支持从本地TS分片读取帧,按照PTS时间间隔模拟实时流
|
||
|
||
import av
|
||
import os
|
||
import time
|
||
import threading
|
||
import queue
|
||
import yaml
|
||
import glob
|
||
|
||
from dataclasses import dataclass
|
||
|
||
from biz.checkpoint.checkpoint_biz import KadianDetector, RTSP_TARGET_FPS, ALERT_PUSH_INTERVAL, FrameProcessorWorker
|
||
from utils.web_socket_sender import WebSocketSender
|
||
from utils.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
WS_HOST = "0.0.0.0"
|
||
WS_PORT = 8765
|
||
|
||
# HLS配置
|
||
HLS_SEGMENT_DIR = "D:\\ProjectDoc\\Police\\data\\hls" # TS分片存储目录
|
||
HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式
|
||
|
||
|
||
# ========================= 数据结构 =========================
|
||
@dataclass
|
||
class CameraConfig:
|
||
id: int
|
||
name: str
|
||
index: str
|
||
|
||
|
||
# ========================= TS 文件读取线程 =========================
|
||
class TSReaderWorker(threading.Thread):
|
||
def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, stop_event: threading.Event):
|
||
super().__init__(daemon=True)
|
||
self.camera_cfg = camera_cfg
|
||
self.frame_queue = frame_queue
|
||
self.stop_event = stop_event
|
||
|
||
# HLS状态变量
|
||
self.current_segment_num = -1 # 当前处理的TS分片序号
|
||
self.start_time = None # 播放开始时间
|
||
self.base_pts = None # 第一个帧的PTS基准
|
||
|
||
# 队列阈值控制
|
||
self.queue_threshold = 50 # 队列中帧数量阈值
|
||
|
||
def find_segment_files(self):
|
||
"""查找TS分片文件,返回排序后的文件名列表"""
|
||
pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts")
|
||
files = glob.glob(pattern)
|
||
# 按文件名中的数字排序
|
||
files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
|
||
return files
|
||
|
||
def get_segment_number(self, filename):
|
||
"""从文件名中提取序号"""
|
||
return int(filename.split('_')[-1].split('.')[0])
|
||
|
||
def find_nth_largest_segment(self, n):
|
||
"""查找第n大的TS分片"""
|
||
files = self.find_segment_files()
|
||
if len(files) < n:
|
||
return None
|
||
# 返回第n大的文件(从大到小排序)
|
||
files_sorted = sorted(files, key=self.get_segment_number, reverse=True)
|
||
return files_sorted[n-1] if n-1 < len(files_sorted) else None
|
||
|
||
def read_ts_frames(self, ts_file):
|
||
"""读取TS文件中的所有帧,返回帧列表和PTS信息"""
|
||
frames_with_pts = []
|
||
|
||
try:
|
||
container = av.open(ts_file)
|
||
video_stream = container.streams.video[0]
|
||
|
||
for packet in container.demux(video_stream):
|
||
for frame in packet.decode():
|
||
if frame.pts is not None:
|
||
# 计算PTS时间(毫秒)
|
||
pts_ms = frame.pts * video_stream.time_base * 1000
|
||
|
||
# 转换为numpy数组
|
||
frame_np = frame.to_ndarray(format='bgr24')
|
||
|
||
frames_with_pts.append({
|
||
'frame': frame_np,
|
||
'pts_ms': pts_ms,
|
||
'timestamp': time.time() # 当前系统时间
|
||
})
|
||
|
||
container.close()
|
||
return frames_with_pts
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}")
|
||
return []
|
||
|
||
def initialize_playback(self):
|
||
"""初始化播放:找到第3大的TS分片,建立时间基准"""
|
||
while not self.stop_event.is_set():
|
||
# 查找第3大的TS分片
|
||
third_largest = self.find_nth_largest_segment(3)
|
||
|
||
if third_largest is not None:
|
||
segment_num = self.get_segment_number(third_largest)
|
||
logger.info(f"[INFO] Found segment {segment_num}, starting playback")
|
||
|
||
# 读取该分片的所有帧
|
||
frames = self.read_ts_frames(third_largest)
|
||
if frames:
|
||
# 设置基准时间
|
||
self.current_segment_num = segment_num
|
||
self.start_time = time.time()
|
||
self.base_pts = frames[0]['pts_ms']
|
||
return frames
|
||
else:
|
||
logger.warning(f"[WARN] No frames found in segment {segment_num}")
|
||
|
||
# 等待0.5秒后重试
|
||
time.sleep(0.5)
|
||
logger.warning(f"[WARN] No frames found, waiting...")
|
||
|
||
return []
|
||
|
||
def find_next_segment(self):
|
||
"""查找下一个TS分片"""
|
||
next_segment_num = self.current_segment_num + 1
|
||
next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts")
|
||
|
||
# 检查+1, +2, +3是否存在
|
||
check_segments = [
|
||
next_segment_num + 0, # +1
|
||
next_segment_num + 1, # +2
|
||
next_segment_num + 2 # +3
|
||
]
|
||
|
||
for seg_num in check_segments:
|
||
seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts")
|
||
if not os.path.exists(seg_path):
|
||
return None
|
||
|
||
# 所有检查的分片都存在,返回下一个分片
|
||
return next_segment_path
|
||
|
||
def run(self):
|
||
"""主循环:读取TS文件并将帧放入队列"""
|
||
try:
|
||
# 初始化播放
|
||
current_frames = self.initialize_playback()
|
||
if not current_frames:
|
||
logger.error("[ERROR] Failed to initialize playback")
|
||
return
|
||
|
||
frame_index = 0
|
||
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
# 检查队列大小,如果超过阈值则等待
|
||
if self.frame_queue.qsize() >= self.queue_threshold:
|
||
time.sleep(0.2)
|
||
continue
|
||
|
||
if frame_index >= len(current_frames):
|
||
# 当前分片播放完毕,查找下一个分片
|
||
next_segment = self.find_next_segment()
|
||
|
||
if next_segment is not None:
|
||
# 读取下一个分片
|
||
current_frames = self.read_ts_frames(next_segment)
|
||
if current_frames:
|
||
self.current_segment_num += 1
|
||
frame_index = 0
|
||
logger.info(f"[INFO] Switching to segment {self.current_segment_num}")
|
||
else:
|
||
logger.warning(f"[WARN] Failed to read next segment, waiting...")
|
||
time.sleep(0.5)
|
||
continue
|
||
else:
|
||
# 下一个分片不存在,等待
|
||
time.sleep(0.5)
|
||
logger.warning(f"[WARN] No next segment found, waiting...")
|
||
continue
|
||
|
||
# 获取当前帧
|
||
frame_data = current_frames[frame_index]
|
||
frame = frame_data['frame']
|
||
pts_ms = frame_data['pts_ms']
|
||
|
||
# 计算预期的播放时间
|
||
expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0
|
||
|
||
# 准备帧数据
|
||
item = {
|
||
"camera_id": self.camera_cfg.id,
|
||
"camera_name": self.camera_cfg.name,
|
||
"timestamp": expected_play_time, # 使用预期的播放时间
|
||
"frame": frame,
|
||
"pts_ms": pts_ms
|
||
}
|
||
|
||
try:
|
||
# 队列满时处理
|
||
if self.frame_queue.full():
|
||
try:
|
||
self.frame_queue.get_nowait()
|
||
self.frame_queue.task_done()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self.frame_queue.put(item, timeout=0.5)
|
||
frame_index += 1
|
||
|
||
except queue.Full:
|
||
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] TSReaderWorker loop error: {e}")
|
||
time.sleep(1) # 出错后等待1秒再继续
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] TSReaderWorker main error: {e}")
|
||
logger.info("[INFO] TSReaderWorker will restart in 3 seconds...")
|
||
time.sleep(3)
|
||
# 重新启动线程
|
||
self.run()
|
||
|
||
|
||
# ========================= 帧处理线程 =========================
|
||
class HLSFrameProcessor(threading.Thread):
|
||
def __init__(self, camera_cfg: CameraConfig, frame_queue: queue.Queue, raw_queue: queue.Queue, stop_event: threading.Event):
|
||
super().__init__(daemon=True)
|
||
self.camera_cfg = camera_cfg
|
||
self.frame_queue = frame_queue
|
||
self.raw_queue = raw_queue
|
||
self.stop_event = stop_event
|
||
|
||
# 时间同步相关
|
||
self.start_time = None
|
||
self.base_pts = None
|
||
|
||
def run(self):
|
||
"""主循环:从帧队列获取帧,按照PTS时间间隔放入raw_queue"""
|
||
try:
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
# 从帧队列获取帧数据(阻塞等待)
|
||
frame_data = self.frame_queue.get(timeout=1.0)
|
||
|
||
# 初始化时间基准
|
||
if self.start_time is None:
|
||
self.start_time = time.time()
|
||
self.base_pts = frame_data['pts_ms']
|
||
logger.info("[INFO] Frame processor initialized time base")
|
||
|
||
# 计算预期的播放时间
|
||
expected_play_time = self.start_time + (frame_data['pts_ms'] - self.base_pts) / 1000.0
|
||
current_time = time.time()
|
||
|
||
# 计算时间差
|
||
time_diff = current_time - expected_play_time
|
||
|
||
# 时间同步策略
|
||
if time_diff > 0.02: # 超过20ms,播放落后
|
||
# 跳过这一帧追赶
|
||
logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping frame")
|
||
self.frame_queue.task_done()
|
||
continue
|
||
elif time_diff < -0.02: # 超前20ms,需要等待
|
||
# 等待到正确的时间点
|
||
wait_time = -time_diff
|
||
if wait_time > 0.1: # 如果等待时间过长,重置时间基准
|
||
logger.info(f"[WARN] Too far ahead, resetting time base")
|
||
self.start_time = current_time
|
||
self.base_pts = frame_data['pts_ms']
|
||
else:
|
||
time.sleep(wait_time)
|
||
logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms")
|
||
|
||
# 时间同步良好,放入raw_queue
|
||
item = {
|
||
"camera_id": frame_data["camera_id"],
|
||
"camera_name": frame_data["camera_name"],
|
||
"timestamp": current_time,
|
||
"frame": frame_data["frame"],
|
||
}
|
||
|
||
try:
|
||
# raw_queue满时处理
|
||
if self.raw_queue.full():
|
||
try:
|
||
self.raw_queue.get_nowait()
|
||
self.raw_queue.task_done()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self.raw_queue.put(item, timeout=0.5)
|
||
|
||
except queue.Full:
|
||
logger.warning(f"[WARN] Raw queue full, dropping frame from {frame_data['camera_name']}")
|
||
|
||
# 标记帧处理完成
|
||
self.frame_queue.task_done()
|
||
|
||
except queue.Empty:
|
||
# 队列为空,继续等待
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}")
|
||
time.sleep(1) # 出错后等待1秒再继续
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] HLSFrameProcessor main error: {e}")
|
||
logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...")
|
||
time.sleep(3)
|
||
# 重新启动线程
|
||
self.run()
|
||
|
||
def find_segment_files(self):
|
||
"""查找TS分片文件,返回排序后的文件名列表"""
|
||
pattern = os.path.join(HLS_SEGMENT_DIR, "segment_*.ts")
|
||
files = glob.glob(pattern)
|
||
# 按文件名中的数字排序
|
||
files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
|
||
return files
|
||
|
||
def get_segment_number(self, filename):
|
||
"""从文件名中提取序号"""
|
||
return int(filename.split('_')[-1].split('.')[0])
|
||
|
||
def find_nth_largest_segment(self, n):
|
||
"""查找第n大的TS分片"""
|
||
files = self.find_segment_files()
|
||
if len(files) < n:
|
||
return None
|
||
# 返回第n大的文件(从大到小排序)
|
||
files_sorted = sorted(files, key=self.get_segment_number, reverse=True)
|
||
return files_sorted[n-1] if n-1 < len(files_sorted) else None
|
||
|
||
def read_ts_frames(self, ts_file):
|
||
"""读取TS文件中的所有帧,返回帧列表和PTS信息"""
|
||
frames_with_pts = []
|
||
|
||
try:
|
||
container = av.open(ts_file)
|
||
video_stream = container.streams.video[0]
|
||
|
||
for packet in container.demux(video_stream):
|
||
for frame in packet.decode():
|
||
if frame.pts is not None:
|
||
# 计算PTS时间(毫秒)
|
||
pts_ms = frame.pts * video_stream.time_base * 1000
|
||
|
||
# 转换为numpy数组
|
||
frame_np = frame.to_ndarray(format='bgr24')
|
||
|
||
frames_with_pts.append({
|
||
'frame': frame_np,
|
||
'pts_ms': pts_ms,
|
||
'timestamp': time.time() # 当前系统时间
|
||
})
|
||
|
||
container.close()
|
||
return frames_with_pts
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to read TS file {ts_file}: {e}")
|
||
return []
|
||
|
||
def initialize_playback(self):
|
||
"""初始化播放:找到第3大的TS分片,建立时间基准"""
|
||
while not self.stop_event.is_set():
|
||
# 查找第3大的TS分片
|
||
third_largest = self.find_nth_largest_segment(3)
|
||
|
||
if third_largest is not None:
|
||
segment_num = self.get_segment_number(third_largest)
|
||
logger.info(f"[INFO] Found segment {segment_num}, starting playback")
|
||
|
||
# 读取该分片的所有帧
|
||
frames = self.read_ts_frames(third_largest)
|
||
if frames:
|
||
# 设置基准时间
|
||
self.current_segment_num = segment_num
|
||
self.start_time = time.time()
|
||
self.base_pts = frames[0]['pts_ms']
|
||
return frames
|
||
else:
|
||
logger.warning(f"[WARN] No frames found in segment {segment_num}")
|
||
|
||
# 等待0.5秒后重试
|
||
time.sleep(0.5)
|
||
logger.warning(f"[WARN] No frames found, waiting...")
|
||
|
||
return []
|
||
|
||
def find_next_segment(self):
|
||
"""查找下一个TS分片"""
|
||
next_segment_num = self.current_segment_num + 1
|
||
next_segment_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{next_segment_num:09d}.ts")
|
||
|
||
# 检查+1, +2, +3是否存在
|
||
check_segments = [
|
||
next_segment_num + 0, # +1
|
||
next_segment_num + 1, # +2
|
||
next_segment_num + 2 # +3
|
||
]
|
||
|
||
for seg_num in check_segments:
|
||
seg_path = os.path.join(HLS_SEGMENT_DIR, f"segment_{seg_num:09d}.ts")
|
||
if not os.path.exists(seg_path):
|
||
return None
|
||
|
||
# 所有检查的分片都存在,返回下一个分片
|
||
return next_segment_path
|
||
|
||
def run(self):
|
||
"""主循环:模拟实时播放HLS流"""
|
||
# 初始化播放
|
||
current_frames = self.initialize_playback()
|
||
if not current_frames:
|
||
logger.error("[ERROR] Failed to initialize playback")
|
||
return
|
||
|
||
frame_index = 0
|
||
|
||
while not self.stop_event.is_set():
|
||
if frame_index >= len(current_frames):
|
||
# 当前分片播放完毕,查找下一个分片
|
||
next_segment = self.find_next_segment()
|
||
|
||
if next_segment is not None:
|
||
# 读取下一个分片
|
||
current_frames = self.read_ts_frames(next_segment)
|
||
if current_frames:
|
||
self.current_segment_num += 1
|
||
frame_index = 0
|
||
logger.info(f"[INFO] Switching to segment {self.current_segment_num}")
|
||
else:
|
||
logger.warning(f"[WARN] Failed to read next segment, waiting...")
|
||
time.sleep(0.5)
|
||
continue
|
||
else:
|
||
# 下一个分片不存在,等待
|
||
time.sleep(0.5)
|
||
logger.warning(f"[WARN] No next segment found, waiting...")
|
||
continue
|
||
|
||
# 获取当前帧
|
||
frame_data = current_frames[frame_index]
|
||
frame = frame_data['frame']
|
||
pts_ms = frame_data['pts_ms']
|
||
|
||
# 计算预期的播放时间
|
||
expected_play_time = self.start_time + (pts_ms - self.base_pts) / 1000.0
|
||
current_time = time.time()
|
||
|
||
# 计算时间差
|
||
time_diff = current_time - expected_play_time
|
||
|
||
# 时间同步策略
|
||
if time_diff > 0.02: # 超过20ms,播放落后
|
||
# 跳过一些帧追赶
|
||
skip_count = min(int(time_diff * RTSP_TARGET_FPS), len(current_frames) - frame_index - 1)
|
||
if skip_count > 0:
|
||
logger.info(f"[DEBUG] Lagging behind by {time_diff*1000:.1f}ms, skipping {skip_count} frames")
|
||
frame_index += skip_count
|
||
continue
|
||
elif time_diff < -0.02: # 超前20ms,需要等待
|
||
# 等待到正确的时间点
|
||
wait_time = -time_diff
|
||
if wait_time > 0.1: # 如果等待时间过长,重置时间基准
|
||
logger.info(f"[WARN] Too far ahead, resetting time base")
|
||
self.start_time = current_time
|
||
self.base_pts = pts_ms
|
||
else:
|
||
time.sleep(wait_time)
|
||
logger.info(f"[DEBUG] Waiting for {wait_time*1000:.1f}ms")
|
||
continue
|
||
|
||
# 时间同步良好,放入队列
|
||
item = {
|
||
"camera_id": self.camera_cfg.id,
|
||
"camera_name": self.camera_cfg.name,
|
||
"timestamp": current_time,
|
||
"frame": frame,
|
||
}
|
||
|
||
try:
|
||
# 队列满时处理
|
||
if self.raw_queue.full():
|
||
try:
|
||
self.raw_queue.get_nowait()
|
||
self.raw_queue.task_done()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self.raw_queue.put(item, timeout=0.5)
|
||
frame_index += 1
|
||
|
||
except queue.Full:
|
||
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}")
|
||
|
||
# 控制处理速度
|
||
time.sleep(0.02) # 最小间隔
|
||
|
||
|
||
# ========================= 服务主类 =========================
|
||
class HLSKadianService:
|
||
def __init__(self, config_path: str = "config.yaml"):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
self.cameras = [CameraConfig(id=c["id"], name=c.get("name", f"cam_{c['id']}"), index=c["index"])
|
||
for c in cfg.get("cameras", [])]
|
||
|
||
self.stop_event = threading.Event()
|
||
self.frame_queue = queue.Queue(maxsize=50) # 帧队列,容量较大
|
||
self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小
|
||
self.ws_queue = queue.Queue(maxsize=1000) # WebSocket队列
|
||
|
||
self.ts_reader_workers = []
|
||
self.frame_processor_workers = []
|
||
self.biz_processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event)
|
||
self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT)
|
||
|
||
def start(self):
|
||
# 确保TS分片目录存在
|
||
os.makedirs(HLS_SEGMENT_DIR, exist_ok=True)
|
||
|
||
self.ws_sender.start()
|
||
self.biz_processor.start()
|
||
|
||
# 为每个摄像头启动TS读取线程和帧处理线程
|
||
for cam in self.cameras:
|
||
# 启动TS读取线程
|
||
ts_reader = TSReaderWorker(cam, self.frame_queue, self.stop_event)
|
||
ts_reader.start()
|
||
self.ts_reader_workers.append(ts_reader)
|
||
|
||
# 启动帧处理线程
|
||
frame_processor = HLSFrameProcessor(cam, self.frame_queue, self.raw_queue, self.stop_event)
|
||
frame_processor.start()
|
||
self.frame_processor_workers.append(frame_processor)
|
||
|
||
logger.info("[INFO] HLS Kadian Service started")
|
||
|
||
def stop(self):
|
||
self.stop_event.set()
|
||
self.frame_queue.join()
|
||
self.raw_queue.join()
|
||
self.ws_queue.join()
|
||
|
||
for w in self.ts_reader_workers:
|
||
w.join(timeout=2.0)
|
||
for w in self.frame_processor_workers:
|
||
w.join(timeout=2.0)
|
||
|
||
self.biz_processor.join(timeout=2.0)
|
||
self.ws_sender.join(timeout=2.0)
|
||
logger.info("[INFO] Service stopped")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
service = HLSKadianService("config.yaml")
|
||
service.start()
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
service.stop() |