361 lines
15 KiB
Python
361 lines
15 KiB
Python
# hls_service_kadian.py
|
||
# 基于HLS TS分片的卡点检测服务
|
||
# 支持从本地TS分片读取帧,按照PTS时间间隔模拟实时流
|
||
|
||
import av
|
||
import os
|
||
import time
|
||
import threading
|
||
import queue
|
||
import glob
|
||
import argparse
|
||
import sys
|
||
|
||
from dataclasses import dataclass
|
||
|
||
# from biz.checkpoint.checkpoint_biz import FrameProcessorWorker
|
||
# from biz.prison.prison_biz import FrameProcessorWorker
|
||
# from biz.prison.trajectory02_biz import FrameProcessorWorker
|
||
# from biz.prison.supervision_room_biz import FrameProcessorWorker
|
||
|
||
from common.processor_factory import get_processor
|
||
|
||
from common.camera_config import CameraConfig, parse_cameras_from_json, parse_cameras_from_yaml
|
||
from common import constants
|
||
from utils.web_socket_sender import WebSocketSender
|
||
from utils.logger import get_logger
|
||
from utils.hls_utils import get_latest_n_segments as get_latest_segments
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
# ========================= 合并的TS读取和帧处理线程 =========================
|
||
class HLSFrameProcessor(threading.Thread):
|
||
def __init__(self, camera_cfg: CameraConfig, hls_root_path: str, raw_queue: queue.Queue, stop_event: threading.Event):
|
||
super().__init__(daemon=True)
|
||
self.camera_cfg = camera_cfg
|
||
self.hls_root_path = hls_root_path
|
||
self.raw_queue = raw_queue
|
||
self.stop_event = stop_event
|
||
|
||
# 获取index_code
|
||
self.index_code = camera_cfg.index
|
||
if not self.index_code:
|
||
logger.error(f"[ERROR] Camera {camera_cfg.name} has no index_code")
|
||
return
|
||
|
||
# 计算摄像头对应的根目录
|
||
self.camera_root_dir = os.path.join(hls_root_path, self.index_code)
|
||
|
||
# HLS状态变量
|
||
self.current_segment_path = None # 当前处理的TS分片路径
|
||
self.start_time = None # 播放开始时间
|
||
self.base_pts = None # 第一个帧的PTS基准
|
||
|
||
# 时间同步相关
|
||
self.last_process_time = None # 上次处理时间
|
||
self.last_frame_pts = None # 上一帧的PTS时间
|
||
self.pts_diff = 0
|
||
|
||
self.should_reset_time = False
|
||
|
||
def get_latest_n_segments(self, n: int) -> list:
|
||
"""获取最新的n个TS分片(委托给工具函数)"""
|
||
return get_latest_segments(self.camera_root_dir, n)
|
||
|
||
def get_segment_number(self, filename):
|
||
"""从文件名中提取序号"""
|
||
basename = os.path.basename(filename)
|
||
return int(basename.split('_')[-1].split('.')[0])
|
||
|
||
def find_nth_largest_segment(self, n):
|
||
"""查找第n大的TS分片(最新的n个中的第1个,即倒数第n个)"""
|
||
segments = self.get_latest_n_segments(n + 2) # 多获取几个确保有缓冲
|
||
if len(segments) < n:
|
||
return None
|
||
# 返回倒数第n个(最新的n个中最旧的那个)
|
||
return segments[-n]
|
||
|
||
def find_next_segment(self):
|
||
"""查找下一个TS分片"""
|
||
latest_segments = self.get_latest_n_segments(10)
|
||
|
||
if not latest_segments:
|
||
return None
|
||
|
||
# 情况1:当前分片在list中 → 返回下一个
|
||
if self.current_segment_path in latest_segments:
|
||
current_index = latest_segments.index(self.current_segment_path)
|
||
if current_index + 3 < len(latest_segments):
|
||
return latest_segments[current_index + 1]
|
||
else:
|
||
return None # 当前是最新分片,等待
|
||
|
||
# 情况2:当前分片不在list中 → 返回最新第3个(重新同步)
|
||
if len(latest_segments) >= 3:
|
||
logger.warning(f"[WARN] Current segment not in latest list, re-syncing")
|
||
return latest_segments[-3] # 倒数第3个
|
||
else:
|
||
return None # 分片不足,等待
|
||
|
||
def initialize_playback(self):
|
||
"""初始化播放:找到最新第3大的TS分片,建立时间基准"""
|
||
while not self.stop_event.is_set():
|
||
# 查找最新第3大的TS分片
|
||
third_largest = self.find_nth_largest_segment(3)
|
||
|
||
if third_largest is not None:
|
||
logger.info(f"[INFO] Found segment: {os.path.basename(third_largest)}, starting playback")
|
||
|
||
# 设置基准时间
|
||
self.current_segment_path = third_largest
|
||
self.start_time = time.time()
|
||
return third_largest
|
||
|
||
# 等待0.5秒后重试
|
||
time.sleep(0.5)
|
||
logger.warning(f"[WARN] No segments found in {self.camera_root_dir}, waiting...")
|
||
|
||
return None
|
||
|
||
def process_frame_with_rate_control(self, frame_data, frame_pts):
|
||
"""处理单帧,加入帧率控制逻辑"""
|
||
# 初始化时间基准
|
||
if self.base_pts is None:
|
||
self.base_pts = frame_pts
|
||
self.last_frame_pts = frame_pts
|
||
logger.info("[INFO] Frame processor initialized time base")
|
||
|
||
# 计算预期的播放时间
|
||
expected_play_time = self.start_time + (frame_pts - self.base_pts) / 1000.0
|
||
current_time = time.time()
|
||
|
||
# 计算时间差
|
||
wait_time = expected_play_time - current_time
|
||
|
||
if wait_time > 0:
|
||
if wait_time > 1: # 如果等待时间过长,重置时间基准
|
||
logger.info(f"[WARN] Too far ahead, resetting time base")
|
||
self.start_time = current_time
|
||
self.base_pts = frame_pts
|
||
else:
|
||
time.sleep(wait_time)
|
||
else :
|
||
if wait_time < -1:
|
||
logger.info(f"[WARN] Too far behind, resetting time base")
|
||
self.start_time = current_time
|
||
self.base_pts = frame_pts
|
||
|
||
item = {
|
||
"camera_id": self.camera_cfg.id,
|
||
"camera_name": self.camera_cfg.name,
|
||
"timestamp": current_time,
|
||
"camera_index": self.camera_cfg.index,
|
||
"frame": frame_data,
|
||
}
|
||
|
||
try:
|
||
# raw_queue满时处理
|
||
if self.raw_queue.full():
|
||
try:
|
||
self.raw_queue.get_nowait()
|
||
self.raw_queue.task_done()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self.raw_queue.put(item, timeout=0.5)
|
||
return True
|
||
|
||
except queue.Full:
|
||
logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}")
|
||
return False
|
||
|
||
def run(self):
|
||
"""主循环:读取TS文件并实时处理帧,加入帧率控制"""
|
||
try:
|
||
# 初始化播放
|
||
initial_segment = self.initialize_playback()
|
||
if not initial_segment:
|
||
logger.error("[ERROR] Failed to initialize playback")
|
||
return
|
||
|
||
# 处理初始分片
|
||
self.process_segment_with_rate_control(initial_segment)
|
||
|
||
# 连续检测失败计数器
|
||
consecutive_failures = 0
|
||
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
# 查找下一个分片
|
||
next_segment = self.find_next_segment()
|
||
|
||
if next_segment is not None:
|
||
# 重置连续失败计数器
|
||
consecutive_failures = 0
|
||
|
||
# 处理下一个分片
|
||
logger.info(f"[INFO] Starting to process TS segment: {os.path.basename(next_segment)}")
|
||
self.current_segment_path = next_segment
|
||
|
||
self.process_segment_with_rate_control(next_segment)
|
||
|
||
logger.info(f"[INFO] Finished processing segment: {os.path.basename(next_segment)}")
|
||
else:
|
||
# 下一个分片不存在,根据连续失败次数调整等待时间
|
||
consecutive_failures += 1
|
||
|
||
self.should_reset_time = True
|
||
|
||
if consecutive_failures <= 10:
|
||
sleep_time = 0.02 # 前10次失败,等待0.02秒
|
||
elif consecutive_failures <= 20:
|
||
sleep_time = 0.05 # 继续5次失败,等待0.05秒
|
||
else:
|
||
sleep_time = 0.5 # 超过15次失败,等待0.5秒
|
||
|
||
logger.warning(f"[WARN] No next segment found (failures: {consecutive_failures}), waiting {sleep_time}s...")
|
||
time.sleep(sleep_time)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}")
|
||
time.sleep(1) # 出错后等待1秒再继续
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] HLSFrameProcessor main error: {e}")
|
||
logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...")
|
||
time.sleep(3)
|
||
# 重新启动线程
|
||
self.run()
|
||
|
||
def process_segment_with_rate_control(self, segment_path):
|
||
"""处理单个TS分片,加入实时帧率控制"""
|
||
try:
|
||
container = av.open(segment_path)
|
||
video_stream = container.streams.video[0]
|
||
|
||
# 重置时间轴:使用新分片的第一帧PTS重新设置时间基准
|
||
current_time = time.time()
|
||
|
||
if self.base_pts is None or self.should_reset_time:
|
||
# 读取第一帧来获取PTS
|
||
for packet in container.demux(video_stream):
|
||
for frame in packet.decode():
|
||
if frame.pts is not None:
|
||
# 重置时间基准
|
||
self.start_time = current_time
|
||
self.base_pts = float(frame.pts * video_stream.time_base) * 1000
|
||
self.should_reset_time = False
|
||
logger.info(f"[INFO] Time axis reset: start_time={current_time}, base_pts={self.base_pts:.1f}")
|
||
break
|
||
break
|
||
# 重置容器到开始
|
||
container.close()
|
||
container = av.open(segment_path)
|
||
video_stream = container.streams.video[0]
|
||
|
||
# 处理每一帧
|
||
for packet in container.demux(video_stream):
|
||
for frame in packet.decode():
|
||
if frame.pts is not None and not self.stop_event.is_set():
|
||
# 计算PTS时间(毫秒)
|
||
pts_ms = frame.pts * video_stream.time_base * 1000
|
||
|
||
# 转换为numpy数组
|
||
frame_np = frame.to_ndarray(format='bgr24')
|
||
|
||
# 实时处理帧,加入帧率控制
|
||
self.process_frame_with_rate_control(frame_np, pts_ms)
|
||
|
||
container.close()
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to process TS file {segment_path}: {e}")
|
||
|
||
|
||
|
||
|
||
|
||
# ========================= 服务主类 =========================
|
||
class HLSKadianService:
|
||
def __init__(self, cameras: list[CameraConfig], hls_root_path: str, ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""):
|
||
self.cameras = cameras
|
||
self.hls_root_path = hls_root_path
|
||
self.ws_host = ws_host
|
||
self.ws_port = ws_port
|
||
self.algorithm = algorithm
|
||
|
||
self.stop_event = threading.Event()
|
||
self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小
|
||
self.ws_queue = queue.Queue(maxsize=10) # WebSocket队列,减小容量防止延迟累积
|
||
|
||
self.frame_processor_workers = []
|
||
self.biz_processor = get_processor(self.algorithm)(self.raw_queue, self.ws_queue, self.stop_event, self.cameras)
|
||
self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port)
|
||
|
||
def start(self):
|
||
# 确保HLS根目录存在
|
||
os.makedirs(self.hls_root_path, exist_ok=True)
|
||
|
||
self.ws_sender.start()
|
||
self.biz_processor.start()
|
||
|
||
# 为每个摄像头启动合并后的帧处理线程
|
||
for cam in self.cameras:
|
||
if not cam.index:
|
||
logger.warning(f"[WARN] Camera {cam.name} has no index_code, skipping")
|
||
continue
|
||
# 启动合并后的帧处理线程(包含TS读取和帧率控制)
|
||
frame_processor = HLSFrameProcessor(cam, self.hls_root_path, self.raw_queue, self.stop_event)
|
||
frame_processor.start()
|
||
self.frame_processor_workers.append(frame_processor)
|
||
|
||
logger.info("[INFO] HLS Kadian Service started (merged TS reader and frame processor)")
|
||
|
||
def stop(self):
|
||
self.stop_event.set()
|
||
self.raw_queue.join()
|
||
self.ws_queue.join()
|
||
|
||
for w in self.frame_processor_workers:
|
||
w.join(timeout=2.0)
|
||
|
||
self.biz_processor.join(timeout=2.0)
|
||
self.ws_sender.join(timeout=2.0)
|
||
logger.info("[INFO] Service stopped")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="HLS Service for Detection")
|
||
parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)")
|
||
parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file")
|
||
parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host")
|
||
parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port")
|
||
parser.add_argument("--algorithm", type=str, default="", help="Algorithm type")
|
||
parser.add_argument("--hls-root-path", type=str, required=True, help="HLS root path for TS segments")
|
||
args = parser.parse_args()
|
||
|
||
# 初始化全局配置
|
||
constants.init_config(args.config)
|
||
constants.HLS_ROOT_PATH = args.hls_root_path
|
||
|
||
# 优先使用命令行传入的 cameras JSON,否则读取配置文件
|
||
if args.cameras:
|
||
cameras = parse_cameras_from_json(args.cameras)
|
||
logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument")
|
||
else:
|
||
cameras = parse_cameras_from_yaml(args.config)
|
||
logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}")
|
||
|
||
if not cameras:
|
||
logger.error("[ERROR] No cameras configured, exiting...")
|
||
sys.exit(1)
|
||
|
||
service = HLSKadianService(cameras, hls_root_path=args.hls_root_path, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm)
|
||
service.start()
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
service.stop()
|