Files
AItst/AIMonitor/rtsp_service_ws.py
2026-02-08 14:33:45 +08:00

407 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
import threading
import queue
import yaml
import os
import json
import base64
import asyncio
import websockets
from dataclasses import dataclass
from typing import Optional, Dict, Any, Tuple
# =========================
# 配置与数据结构
# =========================
@dataclass
class CameraConfig:
id: int
name: str
rtsp_url: str
RTSP_TARGET_FPS = 10.0 # 固定 10 帧/秒
FRAMES_PER_SEGMENT = 600 # 每 600 帧一个 mp4
VIDEO_OUTPUT_DIR = "./videos" # 视频输出目录
WS_HOST = "0.0.0.0" # WebSocket 服务端监听地址
WS_PORT = 8765 # WebSocket 服务端端口
# 已连接的 WebSocket 客户端集合
ws_clients = set()
# =========================
# WebSocket 服务线程
# =========================
class WebSocketSender(threading.Thread):
"""
WebSocket 服务端线程:
- 在 WS_HOST:WS_PORT 上启动 websockets 服务器
- 从 send_queue 中读取消息,广播给所有已连接客户端
"""
def __init__(self, send_queue: "queue.Queue[Dict[str, Any]]", stop_event: threading.Event):
super().__init__(daemon=True)
self.send_queue = send_queue
self.stop_event = stop_event
async def _ws_handler(self, websocket):
# 新客户端连接
ws_clients.add(websocket)
try:
async for _ in websocket:
# 当前忽略客户端发送的消息
pass
finally:
# 客户端断开
ws_clients.discard(websocket)
async def _broadcaster(self):
"""从队列中取出消息并广播给所有连接的客户端"""
while not self.stop_event.is_set():
try:
# 在线程池中阻塞等待队列消息
msg = await asyncio.to_thread(self.send_queue.get, timeout=0.5)
except queue.Empty:
continue
data = json.dumps(msg)
dead = []
for ws in list(ws_clients):
try:
await ws.send(data)
except Exception:
dead.append(ws)
for ws in dead:
ws_clients.discard(ws)
self.send_queue.task_done()
async def _run_async(self):
async with websockets.serve(self._ws_handler, WS_HOST, WS_PORT):
print(f"[INFO] WebSocket server started at ws://{WS_HOST}:{WS_PORT}")
await self._broadcaster()
def run(self):
asyncio.run(self._run_async())
# =========================
# RTSP 抓流线程
# =========================
class RTSPCaptureWorker(threading.Thread):
"""
只负责从 RTSP 读取原始帧,放入 raw_frame_queue。
不负责抽帧、不负责写视频。
"""
def __init__(
self,
camera_cfg: CameraConfig,
raw_frame_queue: "queue.Queue[Dict[str, Any]]",
stop_event: threading.Event,
):
super().__init__(daemon=True)
self.camera_cfg = camera_cfg
self.raw_frame_queue = raw_frame_queue
self.stop_event = stop_event
def run(self):
cap = cv2.VideoCapture(self.camera_cfg.rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
print(f"[ERROR] Cannot open RTSP stream: {self.camera_cfg.rtsp_url}")
return
print(f"[INFO] Start capturing: id={self.camera_cfg.id}, name={self.camera_cfg.name}")
while not self.stop_event.is_set():
ok, frame = cap.read()
if not ok:
print(f"[WARN] Failed to read frame from camera {self.camera_cfg.id}, retrying...")
time.sleep(0.2)
continue
ts = time.time()
item = {
"camera_id": self.camera_cfg.id,
"camera_name": self.camera_cfg.name,
"timestamp": ts,
"frame": frame,
}
try:
self.raw_frame_queue.put(item, timeout=1.0)
except queue.Full:
print(f"[WARN] Raw frame queue full, drop frame from camera {self.camera_cfg.id}")
cap.release()
print(f"[INFO] Stop capturing: id={self.camera_cfg.id}")
# =========================
# 帧处理线程(抽帧 + 写mp4 + 调用用户函数 + 发WebSocket消息
# =========================
class FrameProcessorWorker(threading.Thread):
def __init__(
self,
raw_frame_queue: "queue.Queue[Dict[str, Any]]",
ws_send_queue: "queue.Queue[Dict[str, Any]]",
stop_event: threading.Event,
):
super().__init__(daemon=True)
self.raw_frame_queue = raw_frame_queue
self.ws_send_queue = ws_send_queue
self.stop_event = stop_event
# 每个摄像头独立维护视频写入状态
self.video_writers: Dict[int, cv2.VideoWriter] = {}
self.video_frame_counts: Dict[int, int] = {}
self.video_segment_start_ts: Dict[int, float] = {}
self.video_segment_filenames: Dict[int, str] = {}
os.makedirs(VIDEO_OUTPUT_DIR, exist_ok=True)
# 控制 10fps 抽帧:记录每个摄像头上次处理时间
self.last_process_ts: Dict[int, float] = {}
def _get_video_writer(self, camera_id: int, frame) -> Tuple[cv2.VideoWriter, str]:
"""
获取(或新建)当前摄像头的 VideoWriter。
如果当前 segment 不存在,则新建一个,文件名由第一帧时间命名。
"""
writer = self.video_writers.get(camera_id)
if writer is not None:
return writer, self.video_segment_filenames[camera_id]
h, w = frame.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
start_ts = time.time()
self.video_segment_start_ts[camera_id] = start_ts
ts_str = time.strftime("%Y%m%d_%H%M%S", time.localtime(start_ts))
filename = f"{ts_str}_cam{camera_id}.mp4"
filepath = os.path.join(VIDEO_OUTPUT_DIR, filename)
writer = cv2.VideoWriter(filepath, fourcc, RTSP_TARGET_FPS, (w, h))
self.video_writers[camera_id] = writer
self.video_frame_counts[camera_id] = 0
self.video_segment_filenames[camera_id] = filepath
print(f"[INFO] Start new segment: camera={camera_id}, file={filepath}")
return writer, filepath
def _close_segment_if_needed(self, camera_id: int):
"""
如果当前segment达到 FRAMES_PER_SEGMENT则关闭并清理状态。
"""
count = self.video_frame_counts.get(camera_id, 0)
if count >= FRAMES_PER_SEGMENT:
writer = self.video_writers.get(camera_id)
if writer is not None:
writer.release()
print(f"[INFO] Close segment: camera={camera_id}, file={self.video_segment_filenames[camera_id]}")
# 清空当前 segment 状态
self.video_writers.pop(camera_id, None)
self.video_frame_counts.pop(camera_id, None)
self.video_segment_start_ts.pop(camera_id, None)
self.video_segment_filenames.pop(camera_id, None)
def _encode_image_to_base64(self, image) -> str:
ok, buf = cv2.imencode(".jpg", image)
if not ok:
raise RuntimeError("Failed to encode image to JPEG")
return base64.b64encode(buf.tobytes()).decode("ascii")
def run(self):
print("[INFO] FrameProcessorWorker started")
target_interval = 1.0 / RTSP_TARGET_FPS
while not self.stop_event.is_set():
try:
item = self.raw_frame_queue.get(timeout=0.5)
except queue.Empty:
continue
camera_id = item["camera_id"]
ts = item["timestamp"]
frame = item["frame"]
last_ts = self.last_process_ts.get(camera_id, 0.0)
if ts - last_ts < target_interval:
# 丢弃多余帧保证约10fps
self.raw_frame_queue.task_done()
continue
self.last_process_ts[camera_id] = ts
# 1) 写入 mp4 (当前segment)
writer, video_filepath = self._get_video_writer(camera_id, frame)
writer.write(frame)
self.video_frame_counts[camera_id] = self.video_frame_counts.get(camera_id, 0) + 1
# 2) 调用用户自定义处理逻辑
result = user_process_frame(frame, camera_id, ts)
if result is not None and "image" in result and "type" in result:
result_img = result["image"]
result_type = int(result["type"])
# 3) 通过 WebSocket 发送帧结果
try:
img_b64 = self._encode_image_to_base64(result_img)
except Exception as e:
print(f"[ERROR] Encode image failed: {e}")
img_b64 = None
if img_b64 is not None:
msg = {
"msg_type": "frame",
"camera_id": camera_id,
"timestamp": ts,
"result_type": result_type,
"image_base64": img_b64,
}
try:
self.ws_send_queue.put(msg, timeout=1.0)
except queue.Full:
print("[WARN] ws_send_queue full, drop frame message")
# 4) 如果 result_type != 0通过 WebSocket 发送告警
if result_type != 0:
alert_msg = {
"msg_type": "alert",
"camera_id": camera_id,
"event_type": result_type,
"video_file": video_filepath,
"timestamp": ts,
}
try:
self.ws_send_queue.put(alert_msg, timeout=1.0)
except queue.Full:
print("[WARN] ws_send_queue full, drop alert message")
# 5) 检查是否需要切换到下一个 mp4 segment
self._close_segment_if_needed(camera_id)
self.raw_frame_queue.task_done()
# 退出时,关闭所有 VideoWriter
for cam_id, writer in list(self.video_writers.items()):
writer.release()
print(f"[INFO] Release writer on exit: camera={cam_id}")
print("[INFO] FrameProcessorWorker stopped")
# =========================
# 用户自定义函数 (TBD)
# =========================
def user_process_frame(image, camera_id: int, timestamp: float) -> Dict[str, Any]:
"""
你在这里实现算法逻辑:
- image: numpy.ndarray, BGR
- camera_id: 摄像头 id
- timestamp: 捕获时间戳 (time.time())
返回:
- {"image": image, "type": int}
"""
# TODO: 替换为你的实际逻辑,例如模型推理
result_type = 0 # 示例默认0
return {
"image": image,
"type": result_type,
}
# =========================
# 服务封装
# =========================
class RTSPService:
def __init__(self, config_path: str):
self.config_path = config_path
self.cameras = self._load_config()
self.stop_event = threading.Event()
# 队列
self.raw_frame_queue: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=500)
self.ws_send_queue: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=1000)
# 线程
self.capture_workers = []
self.frame_processor = FrameProcessorWorker(self.raw_frame_queue, self.ws_send_queue, self.stop_event)
self.ws_sender = WebSocketSender(self.ws_send_queue, self.stop_event)
def _load_config(self):
with open(self.config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
cameras_cfg = cfg.get("cameras", [])
cameras = []
for c in cameras_cfg:
cameras.append(
CameraConfig(
id=int(c["id"]),
name=str(c.get("name", f"cam_{c['id']}")),
rtsp_url=str(c["rtsp_url"]),
)
)
return cameras
def start(self):
print("[INFO] RTSPService starting...")
# 启动 WebSocket 发送线程
self.ws_sender.start()
# 启动帧处理线程
self.frame_processor.start()
# 启动每个摄像头的抓流线程
for cam in self.cameras:
w = RTSPCaptureWorker(cam, self.raw_frame_queue, self.stop_event)
w.start()
self.capture_workers.append(w)
print("[INFO] RTSPService started")
def stop(self):
print("[INFO] RTSPService stopping...")
self.stop_event.set()
# 等待队列处理完(可选)
try:
self.raw_frame_queue.join()
self.ws_send_queue.join()
except Exception:
pass
for w in self.capture_workers:
w.join(timeout=1.0)
self.frame_processor.join(timeout=1.0)
self.ws_sender.join(timeout=1.0)
print("[INFO] RTSPService stopped")
def main():
service = RTSPService(config_path="config.yaml")
service.start()
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
print("[INFO] KeyboardInterrupt, shutting down...")
finally:
service.stop()
if __name__ == "__main__":
main()