Files
SupervisorAI/hls_service_ws_kadian.py
2026-03-04 18:36:01 +08:00

404 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hls_service_kadian.py
# 基于HLS TS分片的卡点检测服务
# 支持从本地TS分片读取帧按照PTS时间间隔模拟实时流
import av
import os
import time
import threading
import queue
import glob
import argparse
import sys
from dataclasses import dataclass
# from biz.checkpoint.checkpoint_biz import FrameProcessorWorker
# from biz.prison.prison_biz import FrameProcessorWorker
# from biz.prison.trajectory02_biz import FrameProcessorWorker
# from biz.prison.supervision_room_biz import FrameProcessorWorker
from common.processor_factory import get_processor
from common.camera_config import CameraConfig, parse_cameras_from_json, parse_cameras_from_yaml
from common.constants import init_config
from utils.web_socket_sender import WebSocketSender
from utils.logger import get_logger
logger = get_logger(__name__)
# ========================= 合并的TS读取和帧处理线程 =========================
class HLSFrameProcessor(threading.Thread):
def __init__(self, camera_cfg: CameraConfig, hls_root_path: str, raw_queue: queue.Queue, stop_event: threading.Event):
super().__init__(daemon=True)
self.camera_cfg = camera_cfg
self.hls_root_path = hls_root_path
self.raw_queue = raw_queue
self.stop_event = stop_event
# 获取index_code
self.index_code = camera_cfg.index
if not self.index_code:
logger.error(f"[ERROR] Camera {camera_cfg.name} has no index_code")
return
# 计算摄像头对应的根目录
self.camera_root_dir = os.path.join(hls_root_path, self.index_code)
# HLS状态变量
self.current_segment_path = None # 当前处理的TS分片路径
self.start_time = None # 播放开始时间
self.base_pts = None # 第一个帧的PTS基准
# 时间同步相关
self.last_process_time = None # 上次处理时间
self.last_frame_pts = None # 上一帧的PTS时间
self.pts_diff = 0
self.should_reset_time = False
def get_latest_n_segments(self, n: int) -> list:
"""
获取最新的n个TS分片
逻辑:
1. 获取index_code文件夹下所有时间戳文件夹
2. 按时间戳名称降序排序(最新的在前)
3. 从最新的时间戳文件夹开始获取分片
4. 如果分片数不足n继续从上一个时间戳文件夹获取
5. 返回最新的n个分片路径list按时间顺序最旧的在前
"""
if not os.path.exists(self.camera_root_dir):
return []
# 获取所有时间戳文件夹并排序(字符串排序即时间排序)
timestamp_folders = []
for folder_name in os.listdir(self.camera_root_dir):
folder_path = os.path.join(self.camera_root_dir, folder_name)
if os.path.isdir(folder_path):
timestamp_folders.append(folder_name)
if not timestamp_folders:
return []
# 降序排序,最新的在前
timestamp_folders.sort(reverse=True)
# 收集分片
all_segments = []
for ts_folder in timestamp_folders:
ts_folder_path = os.path.join(self.camera_root_dir, ts_folder)
pattern = os.path.join(ts_folder_path, "segment_*.ts")
segment_files = glob.glob(pattern)
# 按分片序号排序
segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
all_segments.extend(segment_files)
# 已经收集够了
if len(all_segments) >= n:
break
# 返回最新的n个取最后n个因为最新的在后面
if len(all_segments) >= n:
return all_segments[-n:]
else:
return all_segments
def get_segment_number(self, filename):
"""从文件名中提取序号"""
basename = os.path.basename(filename)
return int(basename.split('_')[-1].split('.')[0])
def find_nth_largest_segment(self, n):
"""查找第n大的TS分片最新的n个中的第1个即倒数第n个"""
segments = self.get_latest_n_segments(n + 2) # 多获取几个确保有缓冲
if len(segments) < n:
return None
# 返回倒数第n个最新的n个中最旧的那个
return segments[-n]
def find_next_segment(self):
"""查找下一个TS分片"""
latest_segments = self.get_latest_n_segments(10)
if not latest_segments:
return None
# 情况1当前分片在list中 → 返回下一个
if self.current_segment_path in latest_segments:
current_index = latest_segments.index(self.current_segment_path)
if current_index + 3 < len(latest_segments):
return latest_segments[current_index + 1]
else:
return None # 当前是最新分片,等待
# 情况2当前分片不在list中 → 返回最新第3个重新同步
if len(latest_segments) >= 3:
logger.warning(f"[WARN] Current segment not in latest list, re-syncing")
return latest_segments[-3] # 倒数第3个
else:
return None # 分片不足,等待
def initialize_playback(self):
"""初始化播放找到最新第3大的TS分片建立时间基准"""
while not self.stop_event.is_set():
# 查找最新第3大的TS分片
third_largest = self.find_nth_largest_segment(3)
if third_largest is not None:
logger.info(f"[INFO] Found segment: {os.path.basename(third_largest)}, starting playback")
# 设置基准时间
self.current_segment_path = third_largest
self.start_time = time.time()
return third_largest
# 等待0.5秒后重试
time.sleep(0.5)
logger.warning(f"[WARN] No segments found in {self.camera_root_dir}, waiting...")
return None
def process_frame_with_rate_control(self, frame_data, frame_pts):
"""处理单帧,加入帧率控制逻辑"""
# 初始化时间基准
if self.base_pts is None:
self.base_pts = frame_pts
self.last_frame_pts = frame_pts
logger.info("[INFO] Frame processor initialized time base")
# 计算预期的播放时间
expected_play_time = self.start_time + (frame_pts - self.base_pts) / 1000.0
current_time = time.time()
# 计算时间差
wait_time = expected_play_time - current_time
if wait_time > 0:
if wait_time > 1: # 如果等待时间过长,重置时间基准
logger.info(f"[WARN] Too far ahead, resetting time base")
self.start_time = current_time
self.base_pts = frame_pts
else:
time.sleep(wait_time)
else :
if wait_time < -1:
logger.info(f"[WARN] Too far behind, resetting time base")
self.start_time = current_time
self.base_pts = frame_pts
item = {
"camera_id": self.camera_cfg.id,
"camera_name": self.camera_cfg.name,
"timestamp": current_time,
"camera_index": self.camera_cfg.index,
"frame": frame_data,
}
try:
# raw_queue满时处理
if self.raw_queue.full():
try:
self.raw_queue.get_nowait()
self.raw_queue.task_done()
except queue.Empty:
pass
self.raw_queue.put(item, timeout=0.5)
return True
except queue.Full:
logger.warning(f"[WARN] Raw queue full, dropping frame from {self.camera_cfg.name}")
return False
def run(self):
"""主循环读取TS文件并实时处理帧加入帧率控制"""
try:
# 初始化播放
initial_segment = self.initialize_playback()
if not initial_segment:
logger.error("[ERROR] Failed to initialize playback")
return
# 处理初始分片
self.process_segment_with_rate_control(initial_segment)
# 连续检测失败计数器
consecutive_failures = 0
while not self.stop_event.is_set():
try:
# 查找下一个分片
next_segment = self.find_next_segment()
if next_segment is not None:
# 重置连续失败计数器
consecutive_failures = 0
# 处理下一个分片
logger.info(f"[INFO] Starting to process TS segment: {os.path.basename(next_segment)}")
self.current_segment_path = next_segment
self.process_segment_with_rate_control(next_segment)
logger.info(f"[INFO] Finished processing segment: {os.path.basename(next_segment)}")
else:
# 下一个分片不存在,根据连续失败次数调整等待时间
consecutive_failures += 1
self.should_reset_time = True
if consecutive_failures <= 10:
sleep_time = 0.02 # 前10次失败等待0.02秒
elif consecutive_failures <= 20:
sleep_time = 0.05 # 继续5次失败等待0.05秒
else:
sleep_time = 0.5 # 超过15次失败等待0.5秒
logger.warning(f"[WARN] No next segment found (failures: {consecutive_failures}), waiting {sleep_time}s...")
time.sleep(sleep_time)
except Exception as e:
logger.error(f"[ERROR] HLSFrameProcessor loop error: {e}")
time.sleep(1) # 出错后等待1秒再继续
except Exception as e:
logger.error(f"[ERROR] HLSFrameProcessor main error: {e}")
logger.info("[INFO] HLSFrameProcessor will restart in 3 seconds...")
time.sleep(3)
# 重新启动线程
self.run()
def process_segment_with_rate_control(self, segment_path):
"""处理单个TS分片加入实时帧率控制"""
try:
container = av.open(segment_path)
video_stream = container.streams.video[0]
# 重置时间轴使用新分片的第一帧PTS重新设置时间基准
current_time = time.time()
if self.base_pts is None or self.should_reset_time:
# 读取第一帧来获取PTS
for packet in container.demux(video_stream):
for frame in packet.decode():
if frame.pts is not None:
# 重置时间基准
self.start_time = current_time
self.base_pts = float(frame.pts * video_stream.time_base) * 1000
self.should_reset_time = False
logger.info(f"[INFO] Time axis reset: start_time={current_time}, base_pts={self.base_pts:.1f}")
break
break
# 重置容器到开始
container.close()
container = av.open(segment_path)
video_stream = container.streams.video[0]
# 处理每一帧
for packet in container.demux(video_stream):
for frame in packet.decode():
if frame.pts is not None and not self.stop_event.is_set():
# 计算PTS时间毫秒
pts_ms = frame.pts * video_stream.time_base * 1000
# 转换为numpy数组
frame_np = frame.to_ndarray(format='bgr24')
# 实时处理帧,加入帧率控制
self.process_frame_with_rate_control(frame_np, pts_ms)
container.close()
except Exception as e:
logger.error(f"[ERROR] Failed to process TS file {segment_path}: {e}")
# ========================= 服务主类 =========================
class HLSKadianService:
def __init__(self, cameras: list[CameraConfig], hls_root_path: str, ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""):
self.cameras = cameras
self.hls_root_path = hls_root_path
self.ws_host = ws_host
self.ws_port = ws_port
self.algorithm = algorithm
self.stop_event = threading.Event()
self.raw_queue = queue.Queue(maxsize=3) # 原始队列,容量较小
self.ws_queue = queue.Queue(maxsize=10) # WebSocket队列减小容量防止延迟累积
self.frame_processor_workers = []
self.biz_processor = get_processor(self.algorithm)(self.raw_queue, self.ws_queue, self.stop_event, self.cameras)
self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port)
def start(self):
# 确保HLS根目录存在
os.makedirs(self.hls_root_path, exist_ok=True)
self.ws_sender.start()
self.biz_processor.start()
# 为每个摄像头启动合并后的帧处理线程
for cam in self.cameras:
if not cam.index:
logger.warning(f"[WARN] Camera {cam.name} has no index_code, skipping")
continue
# 启动合并后的帧处理线程包含TS读取和帧率控制
frame_processor = HLSFrameProcessor(cam, self.hls_root_path, self.raw_queue, self.stop_event)
frame_processor.start()
self.frame_processor_workers.append(frame_processor)
logger.info("[INFO] HLS Kadian Service started (merged TS reader and frame processor)")
def stop(self):
self.stop_event.set()
self.raw_queue.join()
self.ws_queue.join()
for w in self.frame_processor_workers:
w.join(timeout=2.0)
self.biz_processor.join(timeout=2.0)
self.ws_sender.join(timeout=2.0)
logger.info("[INFO] Service stopped")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="HLS Service for Detection")
parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)")
parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file")
parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host")
parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port")
parser.add_argument("--algorithm", type=str, default="", help="Algorithm type")
parser.add_argument("--hls-root-path", type=str, required=True, help="HLS root path for TS segments")
args = parser.parse_args()
# 初始化全局配置
init_config(args.config)
# 优先使用命令行传入的 cameras JSON否则读取配置文件
if args.cameras:
cameras = parse_cameras_from_json(args.cameras)
logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument")
else:
cameras = parse_cameras_from_yaml(args.config)
logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}")
if not cameras:
logger.error("[ERROR] No cameras configured, exiting...")
sys.exit(1)
service = HLSKadianService(cameras, hls_root_path=args.hls_root_path, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm)
service.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
service.stop()