# rtsp_service_kadian.py # 融合 Kadian_Detect_1221.py + rtsp_service_ws.py # 支持多路RTSP、抽帧、分段保存MP4、WebSocket推送图像与告警 import cv2 import os import sys import time import argparse import threading import queue from common.processor_factory import get_processor from common.camera_config import CameraConfig, parse_cameras_from_json, parse_cameras_from_yaml from common.contants import init_config from test_cam import get_camera_preview_url from utils.web_socket_sender import WebSocketSender from utils.logger import get_logger logger = get_logger(__name__) # ========================= RTSP 抓流线程 ========================= class RTSPCaptureWorker(threading.Thread): def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event): super().__init__(daemon=True) self.camera_cfg = camera_cfg self.raw_queue = raw_queue self.stop_event = stop_event # 添加重连计数器 self.reconnect_count = 0 self.max_reconnects = 5 self.rtsp_url = "" def run(self): while not self.stop_event.is_set(): try: if self.reconnect_count >= self.max_reconnects: logger.warning(f"[WARN] RTSP: {self.camera_cfg.name} reach max reconnects, refresh url") self.reconnect_count = 0 new_url = self.refresh_video_url() if new_url: self.rtsp_url = new_url else: logger.error(f"[ERROR] refresh RTSP URL is empty, do nothing") # 检查rtsp_url是否为空或None,如果是则重新获取 if not self.rtsp_url: logger.warning(f"[WARN] RTSP URL is empty, refreshing...") new_url = self.refresh_video_url() if new_url: self.rtsp_url = new_url else: logger.error(f"[ERROR] RTSP URL is still empty, retrying in 5 seconds") time.sleep(5) continue # 方法1:使用TCP传输(更稳定) rtsp_url = self.rtsp_url if "?" not in rtsp_url: rtsp_url += "?transport=tcp" # 强制TCP传输 else: rtsp_url += "&transport=tcp" # 方法2:添加更多FFmpeg参数 cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) # 方法3:设置缓冲区大小 cap.set(cv2.CAP_PROP_BUFFERSIZE, 10) # 增加缓冲区 # 方法4:设置超时和重连参数 os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = \ "rtsp_transport;tcp|buffer_size;1024000|max_delay;500000|stimeout;2000000" # 方法5:设置解码器flags,忽略解码错误 # cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY) if not cap.isOpened(): logger.error(f"[ERROR] Cannot open RTSP: {self.rtsp_url}") time.sleep(2) self.reconnect_count += 1 continue logger.info(f"[INFO] Successfully opened RTSP: {self.name}") self.reconnect_count = 0 # 重置重连计数 # # 设置帧率(可选) # cap.set(cv2.CAP_PROP_FPS, 25) while not self.stop_event.is_set(): ret, frame = cap.read() if not ret: # 检查流是否结束 logger.warning(f"[WARN] Failed to read frame from {self.camera_cfg.name}") # 检查是否还有数据 time.sleep(0.1) # 尝试几次后重连 break item = { "camera_id": self.camera_cfg.id, "camera_name": self.camera_cfg.name, "timestamp": time.time(), "camera_index": self.camera_cfg.index, "frame": frame, } try: # 添加队列满时的处理 if self.raw_queue.full(): # 丢弃最旧的一帧 try: self.raw_queue.get_nowait() self.raw_queue.task_done() except queue.Empty: pass self.raw_queue.put(item, timeout=0.5) except queue.Full: logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}") continue # 控制读取速度,避免过快 time.sleep(0.02) # 约50ms间隔 cap.release() except Exception as e: logger.error(f"[ERROR] Error in RTSP capture for {self.camera_cfg.name}: {e}") time.sleep(2) self.reconnect_count += 1 if self.reconnect_count >= self.max_reconnects: logger.error(f"[ERROR] Max reconnects reached for {self.camera_cfg.name}, stopping.") def refresh_video_url(self): """ 重新通过视频ID获取视频URL,调用test_cam.py中的get_camera_preview_url方法 返回: str: 新的视频URL,如果获取失败则返回None """ try: # 获取视频ID(camera_cfg.index) video_id = self.camera_cfg.index # 调用test_cam.py中的函数 result = get_camera_preview_url(video_id) # 解析结果(与test_cam.py相同) if 'data' in result and 'url' in result['data']: new_url = result['data']['url'] logger.info(f"[INFO] get rtsp url success, URL: {new_url}") return new_url else: logger.error(f"[ERROR] get rtsp url failed: {result}") return None except Exception as e: logger.error(f"[ERROR] get rtsp url error: {str(e)}") return None # ========================= 服务主类 ========================= class RTSPService: def __init__(self, cameras: list[CameraConfig], ws_host: str = "0.0.0.0", ws_port: int = 8765, algorithm: str = ""): self.cameras = cameras self.ws_host = ws_host self.ws_port = ws_port self.algorithm = algorithm self.stop_event = threading.Event() self.raw_queue = queue.Queue(maxsize=2) self.ws_queue = queue.Queue(maxsize=10) self.capture_workers = [] self.biz_processor = get_processor(self.algorithm)(self.raw_queue, self.ws_queue, self.stop_event, self.cameras) self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, self.ws_host, self.ws_port) def start(self): self.ws_sender.start() self.biz_processor.start() for cam in self.cameras: w = RTSPCaptureWorker(cam, self.raw_queue, self.stop_event) w.start() self.capture_workers.append(w) logger.info(f"[INFO] RTSP Service started (algorithm={self.algorithm}, ws={self.ws_host}:{self.ws_port})") def stop(self): self.stop_event.set() self.raw_queue.join() self.ws_queue.join() for w in self.capture_workers: w.join(timeout=2.0) self.biz_processor.join(timeout=2.0) self.ws_sender.join(timeout=2.0) logger.info("[INFO] Service stopped") if __name__ == "__main__": parser = argparse.ArgumentParser(description="RTSP Service for Detection") parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)") parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file") parser.add_argument("--ws-host", type=str, default="0.0.0.0", help="WebSocket host") parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket port") parser.add_argument("--algorithm", type=str, default="", help="Algorithm type") args = parser.parse_args() # 初始化全局配置 init_config(args.config) # 优先使用命令行传入的 cameras JSON,否则读取配置文件 if args.cameras: cameras = parse_cameras_from_json(args.cameras) logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument") else: cameras = parse_cameras_from_yaml(args.config) logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}") if not cameras: logger.error("[ERROR] No cameras configured, exiting...") sys.exit(1) service = RTSPService(cameras, ws_host=args.ws_host, ws_port=args.ws_port, algorithm=args.algorithm) service.start() try: while True: time.sleep(1) except KeyboardInterrupt: service.stop()