268 lines
9.5 KiB
Python
268 lines
9.5 KiB
Python
# rtsp_service_kadian.py
|
||
# 融合 Kadian_Detect_1221.py + rtsp_service_ws.py
|
||
# 支持多路RTSP、抽帧、分段保存MP4、WebSocket推送图像与告警
|
||
|
||
import cv2
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import argparse
|
||
import threading
|
||
import queue
|
||
import yaml
|
||
import base64
|
||
|
||
from dataclasses import dataclass
|
||
|
||
|
||
from biz.checkpoint.checkpoint_biz import FrameProcessorWorker
|
||
from common.camera_config import CameraConfig
|
||
from test_cam import get_camera_preview_url
|
||
from utils.web_socket_sender import WebSocketSender
|
||
|
||
from utils.logger import get_logger
|
||
logger = get_logger(__name__)
|
||
|
||
WS_HOST = "0.0.0.0"
|
||
WS_PORT = 8765
|
||
|
||
|
||
# ========================= RTSP 抓流线程 =========================
|
||
class RTSPCaptureWorker(threading.Thread):
|
||
def __init__(self, camera_cfg: CameraConfig, raw_queue: queue.Queue, stop_event: threading.Event):
|
||
super().__init__(daemon=True)
|
||
self.camera_cfg = camera_cfg
|
||
self.raw_queue = raw_queue
|
||
self.stop_event = stop_event
|
||
# 添加重连计数器
|
||
self.reconnect_count = 0
|
||
self.max_reconnects = 5
|
||
self.rtsp_url = ""
|
||
|
||
def run(self):
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
|
||
if self.reconnect_count >= self.max_reconnects:
|
||
logger.warning(f"[WARN] RTSP: {self.camera_cfg.name} reach max reconnects, refresh url")
|
||
self.reconnect_count = 0
|
||
new_url = self.refresh_video_url()
|
||
if new_url:
|
||
self.rtsp_url = new_url
|
||
else:
|
||
logger.error(f"[ERROR] refresh RTSP URL is empty, do nothing")
|
||
|
||
# 检查rtsp_url是否为空或None,如果是则重新获取
|
||
if not self.rtsp_url:
|
||
logger.warning(f"[WARN] RTSP URL is empty, refreshing...")
|
||
new_url = self.refresh_video_url()
|
||
if new_url:
|
||
self.rtsp_url = new_url
|
||
else:
|
||
logger.error(f"[ERROR] RTSP URL is still empty, retrying in 5 seconds")
|
||
time.sleep(5)
|
||
continue
|
||
|
||
# 方法1:使用TCP传输(更稳定)
|
||
rtsp_url = self.rtsp_url
|
||
if "?" not in rtsp_url:
|
||
rtsp_url += "?transport=tcp" # 强制TCP传输
|
||
else:
|
||
rtsp_url += "&transport=tcp"
|
||
|
||
# 方法2:添加更多FFmpeg参数
|
||
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
|
||
|
||
# 方法3:设置缓冲区大小
|
||
cap.set(cv2.CAP_PROP_BUFFERSIZE, 10) # 增加缓冲区
|
||
|
||
# 方法4:设置超时和重连参数
|
||
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = \
|
||
"rtsp_transport;tcp|buffer_size;1024000|max_delay;500000|stimeout;2000000"
|
||
|
||
# 方法5:设置解码器flags,忽略解码错误
|
||
# cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY)
|
||
|
||
if not cap.isOpened():
|
||
logger.error(f"[ERROR] Cannot open RTSP: {self.rtsp_url}")
|
||
time.sleep(2)
|
||
self.reconnect_count += 1
|
||
continue
|
||
|
||
logger.info(f"[INFO] Successfully opened RTSP: {self.name}")
|
||
self.reconnect_count = 0 # 重置重连计数
|
||
|
||
# # 设置帧率(可选)
|
||
# cap.set(cv2.CAP_PROP_FPS, 25)
|
||
|
||
while not self.stop_event.is_set():
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
# 检查流是否结束
|
||
logger.warning(f"[WARN] Failed to read frame from {self.camera_cfg.name}")
|
||
|
||
# 检查是否还有数据
|
||
time.sleep(0.1)
|
||
# 尝试几次后重连
|
||
break
|
||
|
||
item = {
|
||
"camera_id": self.camera_cfg.id,
|
||
"camera_name": self.camera_cfg.name,
|
||
"timestamp": time.time(),
|
||
"frame": frame,
|
||
}
|
||
|
||
try:
|
||
# 添加队列满时的处理
|
||
if self.raw_queue.full():
|
||
# 丢弃最旧的一帧
|
||
try:
|
||
self.raw_queue.get_nowait()
|
||
self.raw_queue.task_done()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self.raw_queue.put(item, timeout=0.5)
|
||
except queue.Full:
|
||
logger.warning(f"[WARN] Queue full, dropping frame from {self.camera_cfg.name}")
|
||
continue
|
||
|
||
# 控制读取速度,避免过快
|
||
time.sleep(0.02) # 约50ms间隔
|
||
|
||
cap.release()
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Error in RTSP capture for {self.camera_cfg.name}: {e}")
|
||
time.sleep(2)
|
||
self.reconnect_count += 1
|
||
|
||
if self.reconnect_count >= self.max_reconnects:
|
||
logger.error(f"[ERROR] Max reconnects reached for {self.camera_cfg.name}, stopping.")
|
||
|
||
def refresh_video_url(self):
|
||
"""
|
||
重新通过视频ID获取视频URL,调用test_cam.py中的get_camera_preview_url方法
|
||
|
||
返回:
|
||
str: 新的视频URL,如果获取失败则返回None
|
||
"""
|
||
try:
|
||
|
||
# 获取视频ID(camera_cfg.index)
|
||
video_id = self.camera_cfg.index
|
||
|
||
# 调用test_cam.py中的函数
|
||
result = get_camera_preview_url(video_id)
|
||
|
||
# 解析结果(与test_cam.py相同)
|
||
if 'data' in result and 'url' in result['data']:
|
||
new_url = result['data']['url']
|
||
logger.info(f"[INFO] get rtsp url success, URL: {new_url}")
|
||
return new_url
|
||
else:
|
||
logger.error(f"[ERROR] get rtsp url failed: {result}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] get rtsp url error: {str(e)}")
|
||
return None
|
||
|
||
|
||
|
||
# ========================= 服务主类 =========================
|
||
class RTSPService:
|
||
def __init__(self, cameras: list[CameraConfig]):
|
||
self.cameras = cameras
|
||
|
||
self.stop_event = threading.Event()
|
||
self.raw_queue = queue.Queue(maxsize=2)
|
||
self.ws_queue = queue.Queue(maxsize=1000)
|
||
|
||
self.capture_workers = []
|
||
self.processor = FrameProcessorWorker(self.raw_queue, self.ws_queue, self.stop_event, self.cameras)
|
||
self.ws_sender = WebSocketSender(self.ws_queue, self.stop_event, WS_HOST, WS_PORT)
|
||
|
||
def start(self):
|
||
self.ws_sender.start()
|
||
self.processor.start()
|
||
for cam in self.cameras:
|
||
w = RTSPCaptureWorker(cam, self.raw_queue, self.stop_event)
|
||
w.start()
|
||
self.capture_workers.append(w)
|
||
logger.info("[INFO] Kadian RTSP Service started")
|
||
|
||
def stop(self):
|
||
self.stop_event.set()
|
||
self.raw_queue.join()
|
||
self.ws_queue.join()
|
||
for w in self.capture_workers:
|
||
w.join(timeout=2.0)
|
||
self.processor.join(timeout=2.0)
|
||
self.ws_sender.join(timeout=2.0)
|
||
logger.info("[INFO] Service stopped")
|
||
|
||
|
||
def parse_cameras_from_json(json_str: str) -> list[CameraConfig]:
|
||
"""从 JSON 字符串解析摄像头配置(支持 base64 编码)"""
|
||
try:
|
||
# 尝试 base64 解码
|
||
try:
|
||
decoded = base64.b64decode(json_str).decode('utf-8')
|
||
cameras_data = json.loads(decoded)
|
||
except Exception:
|
||
# 如果不是 base64,直接解析 JSON
|
||
cameras_data = json.loads(json_str)
|
||
|
||
return [CameraConfig(
|
||
id=c["id"],
|
||
name=c.get("name", f"cam_{c['id']}"),
|
||
index=c.get("index"),
|
||
params=c.get("params")
|
||
) for c in cameras_data]
|
||
except Exception as e:
|
||
logger.error(f"[ERROR] Failed to parse cameras JSON: {e}")
|
||
return []
|
||
|
||
|
||
def parse_cameras_from_yaml(yaml_path: str) -> list[CameraConfig]:
|
||
"""从 YAML 文件解析摄像头配置"""
|
||
with open(yaml_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
return [CameraConfig(
|
||
id=c["id"],
|
||
name=c.get("name", f"cam_{c['id']}"),
|
||
index=c.get("index"),
|
||
params=c.get("params")
|
||
) for c in cfg.get("cameras", [])]
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="RTSP Service for Kadian Detection")
|
||
parser.add_argument("--cameras", type=str, help="Cameras config in JSON format (or base64 encoded JSON)")
|
||
parser.add_argument("--config", type=str, default="config.yaml", help="Path to config YAML file")
|
||
args = parser.parse_args()
|
||
|
||
# 优先使用命令行传入的 cameras JSON,否则读取配置文件
|
||
if args.cameras:
|
||
cameras = parse_cameras_from_json(args.cameras)
|
||
logger.info(f"[INFO] Loaded {len(cameras)} cameras from command line argument")
|
||
else:
|
||
cameras = parse_cameras_from_yaml(args.config)
|
||
logger.info(f"[INFO] Loaded {len(cameras)} cameras from config file: {args.config}")
|
||
|
||
if not cameras:
|
||
logger.error("[ERROR] No cameras configured, exiting...")
|
||
sys.exit(1)
|
||
|
||
service = RTSPService(cameras)
|
||
service.start()
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
service.stop()
|