Files
SupervisorAI/rtsp_service_ws_kadian.py
2026-02-27 15:20:53 +08:00

235 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# rtsp_service_kadian.py
# 融合 Kadian_Detect_1221.py + rtsp_service_ws.py
# 支持多路RTSP、抽帧、分段保存MP4、WebSocket推送图像与告警
import cv2
import os
import sys
import time
import argparse
import threading
import queue
from common.processor_factory import get_processor
from common.camera_config import CameraConfig, parse_cameras_from_json, parse_cameras_from_yaml
from common.contants import init_config
from test_cam import get_camera_preview_url
from utils.web_socket_sender import WebSocketSender
from utils.logger import get_logger
logger = get_logger(__name__)
# ========================= RTSP 抓流线程 =========================
class RTSPCaptureWorker(threading.Thread):
def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event):
super().__init__(daemon=True)
self.camera_cfg = camera_cfg
self.raw_queue = raw_queue
self.stop_event = stop_event
# 添加重连计数器
self.reconnect_count = 0
self.max_reconnects = 5
self.rtsp_url = ""
def run(self):
while not self.stop_event.is_set():
try:
if self.reconnect_count >= self.max_reconnects:
logger.warning(f"[WARN] RTSP: {self.camera_cfg.name} reach max reconnects, refresh url")
self.reconnect_count = 0
new_url = self.refresh_video_url()
if new_url:
self.rtsp_url = new_url
else:
logger.error(f"[ERROR] refresh RTSP URL is empty, do nothing")
# 检查rtsp_url是否为空或None如果是则重新获取
if not self.rtsp_url:
logger.warning(f"[WARN] RTSP URL is empty, refreshing...")
new_url = self.refresh_video_url()
if new_url:
self.rtsp_url = new_url
else:
logger.error(f"[ERROR] RTSP URL is still empty, retrying in 5 seconds")
time.sleep(5)
continue
# 方法1使用TCP传输更稳定
rtsp_url = self.rtsp_url
if "?" not in rtsp_url:
rtsp_url += "?transport=tcp" # 强制TCP传输
else:
rtsp_url += "&transport=tcp"
# 方法2添加更多FFmpeg参数
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
# 方法3设置缓冲区大小
cap.set(cv2.CAP_PROP_BUFFERSIZE, 10) # 增加缓冲区
# 方法4设置超时和重连参数
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = \
"rtsp_transport;tcp|buffer_size;1024000|max_delay;500000|stimeout;2000000"
# 方法5设置解码器flags忽略解码错误
# cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY)
if not cap.isOpened():
logger.error(f"[ERROR] Cannot open RTSP: {self.rtsp_url}")
time.sleep(2)
self.reconnect_count += 1
continue
logger.info(f"[INFO] Successfully opened RTSP: {self.name}")
self.reconnect_count = 0 # 重置重连计数
# # 设置帧率(可选)
# cap.set(cv2.CAP_PROP_FPS, 25)
while not self.stop_event.is_set():
ret, frame = cap.read()
if not ret:
# 检查流是否结束
logger.warning(f"[WARN] Failed to read frame from {self.camera_cfg.name}")
# 检查是否还有数据
time.sleep(0.1)
# 尝试几次后重连
break
item = {
"camera_id": self.camera_cfg.id,
"camera_name": self.camera_cfg.name,
"timestamp": time.time(),
"frame": frame,
}
try:
# 添加队列满时的处理
if self.raw_queue.full():
# 丢弃最旧的一帧
try:
self.raw_queue.get_nowait()
self.raw_queue.task_done()
except queue.Empty:
pass
self.raw_queue.put(item, timeout=0.5)
except queue.Full:
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}")
continue
# 控制读取速度,避免过快
time.sleep(0.02) # 约50ms间隔
cap.release()
except Exception as e:
logger.error(f"[ERROR] Error in RTSP capture for {self.camera_cfg.name}: {e}")
time.sleep(2)
self.reconnect_count += 1
if self.reconnect_count >= self.max_reconnects:
logger.error(f"[ERROR] Max reconnects reached for {self.camera_cfg.name}, stopping.")
def refresh_video_url(self):
"""
重新通过视频ID获取视频URL调用test_cam.py中的get_camera_preview_url方法
返回:
str: 新的视频URL如果获取失败则返回None
"""
try:
# 获取视频IDcamera_cfg.index
video_id = self.camera_cfg.index
# 调用test_cam.py中的函数
result = get_camera_preview_url(video_id)
# 解析结果与test_cam.py相同
if 'data' in result and 'url' in result['data']:
new_url = result['data']['url']
logger.info(f"[INFO] get rtsp url success, URL: {new_url}")
return new_url
else:
logger.error(f"[ERROR] get rtsp url failed: {result}")
return None
except Exception as e:
logger.error(f"[ERROR] get rtsp url error: {str(e)}")
return None
# ========================= 服务主类 =========================
class RTSPService:
def __init__(self, cameras: list[CameraConfig], ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""):
self.cameras = cameras
self.ws_host = ws_host
self.ws_port = ws_port
self.algorithm = algorithm
self.stop_event = threading.Event()
self.raw_queue = queue.Queue(maxsize=2)
self.ws_queue = queue.Queue(maxsize=1000)
self.capture_workers = []
self.biz_processor = get_processor(self.algorithm)(self.raw_queue, self.ws_queue, self.stop_event, self.cameras)
self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port)
def start(self):
self.ws_sender.start()
self.processor.start()
for cam in self.cameras:
w = RTSPCaptureWorker(cam, self.raw_queue, self.stop_event)
w.start()
self.capture_workers.append(w)
logger.info(f"[INFO] RTSP Service started (algorithm={self.algorithm}, ws={self.ws_host}:{self.ws_port})")
def stop(self):
self.stop_event.set()
self.raw_queue.join()
self.ws_queue.join()
for w in self.capture_workers:
w.join(timeout=2.0)
self.processor.join(timeout=2.0)
self.ws_sender.join(timeout=2.0)
logger.info("[INFO] Service stopped")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="RTSP Service for Detection")
parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)")
parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file")
parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host")
parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port")
parser.add_argument("--algorithm", type=str, default="", help="Algorithm type")
args = parser.parse_args()
# 初始化全局配置
init_config(args.config)
# 优先使用命令行传入的 cameras JSON否则读取配置文件
if args.cameras:
cameras = parse_cameras_from_json(args.cameras)
logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument")
else:
cameras = parse_cameras_from_yaml(args.config)
logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}")
if not cameras:
logger.error("[ERROR] No cameras configured, exiting...")
sys.exit(1)
service = RTSPService(cameras, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm)
service.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
service.stop()