407 lines
14 KiB
Python
407 lines
14 KiB
Python
import cv2
|
||
import time
|
||
import threading
|
||
import queue
|
||
import yaml
|
||
import os
|
||
import json
|
||
import base64
|
||
import asyncio
|
||
import websockets
|
||
from dataclasses import dataclass
|
||
from typing import Optional, Dict, Any, Tuple
|
||
|
||
|
||
# =========================
|
||
# 配置与数据结构
|
||
# =========================
|
||
|
||
@dataclass
|
||
class CameraConfig:
|
||
id: int
|
||
name: str
|
||
rtsp_url: str
|
||
|
||
|
||
RTSP_TARGET_FPS = 10.0 # 固定 10 帧/秒
|
||
FRAMES_PER_SEGMENT = 600 # 每 600 帧一个 mp4
|
||
VIDEO_OUTPUT_DIR = "./videos" # 视频输出目录
|
||
WS_HOST = "0.0.0.0" # WebSocket 服务端监听地址
|
||
WS_PORT = 8765 # WebSocket 服务端端口
|
||
|
||
# 已连接的 WebSocket 客户端集合
|
||
ws_clients = set()
|
||
|
||
|
||
# =========================
|
||
# WebSocket 服务线程
|
||
# =========================
|
||
|
||
class WebSocketSender(threading.Thread):
|
||
"""
|
||
WebSocket 服务端线程:
|
||
- 在 WS_HOST:WS_PORT 上启动 websockets 服务器
|
||
- 从 send_queue 中读取消息,广播给所有已连接客户端
|
||
"""
|
||
def __init__(self, send_queue: "queue.Queue[Dict[str, Any]]", stop_event: threading.Event):
|
||
super().__init__(daemon=True)
|
||
self.send_queue = send_queue
|
||
self.stop_event = stop_event
|
||
|
||
async def _ws_handler(self, websocket):
|
||
# 新客户端连接
|
||
ws_clients.add(websocket)
|
||
try:
|
||
async for _ in websocket:
|
||
# 当前忽略客户端发送的消息
|
||
pass
|
||
finally:
|
||
# 客户端断开
|
||
ws_clients.discard(websocket)
|
||
|
||
async def _broadcaster(self):
|
||
"""从队列中取出消息并广播给所有连接的客户端"""
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
# 在线程池中阻塞等待队列消息
|
||
msg = await asyncio.to_thread(self.send_queue.get, timeout=0.5)
|
||
except queue.Empty:
|
||
continue
|
||
|
||
data = json.dumps(msg)
|
||
dead = []
|
||
for ws in list(ws_clients):
|
||
try:
|
||
await ws.send(data)
|
||
except Exception:
|
||
dead.append(ws)
|
||
for ws in dead:
|
||
ws_clients.discard(ws)
|
||
|
||
self.send_queue.task_done()
|
||
|
||
async def _run_async(self):
|
||
async with websockets.serve(self._ws_handler, WS_HOST, WS_PORT):
|
||
print(f"[INFO] WebSocket server started at ws://{WS_HOST}:{WS_PORT}")
|
||
await self._broadcaster()
|
||
|
||
def run(self):
|
||
asyncio.run(self._run_async())
|
||
|
||
|
||
# =========================
|
||
# RTSP 抓流线程
|
||
# =========================
|
||
|
||
class RTSPCaptureWorker(threading.Thread):
|
||
"""
|
||
只负责从 RTSP 读取原始帧,放入 raw_frame_queue。
|
||
不负责抽帧、不负责写视频。
|
||
"""
|
||
def __init__(
|
||
self,
|
||
camera_cfg: CameraConfig,
|
||
raw_frame_queue: "queue.Queue[Dict[str, Any]]",
|
||
stop_event: threading.Event,
|
||
):
|
||
super().__init__(daemon=True)
|
||
self.camera_cfg = camera_cfg
|
||
self.raw_frame_queue = raw_frame_queue
|
||
self.stop_event = stop_event
|
||
|
||
def run(self):
|
||
cap = cv2.VideoCapture(self.camera_cfg.rtsp_url, cv2.CAP_FFMPEG)
|
||
|
||
if not cap.isOpened():
|
||
print(f"[ERROR] Cannot open RTSP stream: {self.camera_cfg.rtsp_url}")
|
||
return
|
||
|
||
print(f"[INFO] Start capturing: id={self.camera_cfg.id}, name={self.camera_cfg.name}")
|
||
|
||
while not self.stop_event.is_set():
|
||
ok, frame = cap.read()
|
||
if not ok:
|
||
print(f"[WARN] Failed to read frame from camera {self.camera_cfg.id}, retrying...")
|
||
time.sleep(0.2)
|
||
continue
|
||
|
||
ts = time.time()
|
||
item = {
|
||
"camera_id": self.camera_cfg.id,
|
||
"camera_name": self.camera_cfg.name,
|
||
"timestamp": ts,
|
||
"frame": frame,
|
||
}
|
||
|
||
try:
|
||
self.raw_frame_queue.put(item, timeout=1.0)
|
||
except queue.Full:
|
||
print(f"[WARN] Raw frame queue full, drop frame from camera {self.camera_cfg.id}")
|
||
|
||
cap.release()
|
||
print(f"[INFO] Stop capturing: id={self.camera_cfg.id}")
|
||
|
||
|
||
# =========================
|
||
# 帧处理线程(抽帧 + 写mp4 + 调用用户函数 + 发WebSocket消息)
|
||
# =========================
|
||
|
||
class FrameProcessorWorker(threading.Thread):
|
||
def __init__(
|
||
self,
|
||
raw_frame_queue: "queue.Queue[Dict[str, Any]]",
|
||
ws_send_queue: "queue.Queue[Dict[str, Any]]",
|
||
stop_event: threading.Event,
|
||
):
|
||
super().__init__(daemon=True)
|
||
self.raw_frame_queue = raw_frame_queue
|
||
self.ws_send_queue = ws_send_queue
|
||
self.stop_event = stop_event
|
||
|
||
# 每个摄像头独立维护视频写入状态
|
||
self.video_writers: Dict[int, cv2.VideoWriter] = {}
|
||
self.video_frame_counts: Dict[int, int] = {}
|
||
self.video_segment_start_ts: Dict[int, float] = {}
|
||
self.video_segment_filenames: Dict[int, str] = {}
|
||
|
||
os.makedirs(VIDEO_OUTPUT_DIR, exist_ok=True)
|
||
|
||
# 控制 10fps 抽帧:记录每个摄像头上次处理时间
|
||
self.last_process_ts: Dict[int, float] = {}
|
||
|
||
def _get_video_writer(self, camera_id: int, frame) -> Tuple[cv2.VideoWriter, str]:
|
||
"""
|
||
获取(或新建)当前摄像头的 VideoWriter。
|
||
如果当前 segment 不存在,则新建一个,文件名由第一帧时间命名。
|
||
"""
|
||
writer = self.video_writers.get(camera_id)
|
||
if writer is not None:
|
||
return writer, self.video_segment_filenames[camera_id]
|
||
|
||
h, w = frame.shape[:2]
|
||
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
||
|
||
start_ts = time.time()
|
||
self.video_segment_start_ts[camera_id] = start_ts
|
||
|
||
ts_str = time.strftime("%Y%m%d_%H%M%S", time.localtime(start_ts))
|
||
filename = f"{ts_str}_cam{camera_id}.mp4"
|
||
filepath = os.path.join(VIDEO_OUTPUT_DIR, filename)
|
||
|
||
writer = cv2.VideoWriter(filepath, fourcc, RTSP_TARGET_FPS, (w, h))
|
||
self.video_writers[camera_id] = writer
|
||
self.video_frame_counts[camera_id] = 0
|
||
self.video_segment_filenames[camera_id] = filepath
|
||
|
||
print(f"[INFO] Start new segment: camera={camera_id}, file={filepath}")
|
||
return writer, filepath
|
||
|
||
def _close_segment_if_needed(self, camera_id: int):
|
||
"""
|
||
如果当前segment达到 FRAMES_PER_SEGMENT,则关闭并清理状态。
|
||
"""
|
||
count = self.video_frame_counts.get(camera_id, 0)
|
||
if count >= FRAMES_PER_SEGMENT:
|
||
writer = self.video_writers.get(camera_id)
|
||
if writer is not None:
|
||
writer.release()
|
||
print(f"[INFO] Close segment: camera={camera_id}, file={self.video_segment_filenames[camera_id]}")
|
||
|
||
# 清空当前 segment 状态
|
||
self.video_writers.pop(camera_id, None)
|
||
self.video_frame_counts.pop(camera_id, None)
|
||
self.video_segment_start_ts.pop(camera_id, None)
|
||
self.video_segment_filenames.pop(camera_id, None)
|
||
|
||
def _encode_image_to_base64(self, image) -> str:
|
||
ok, buf = cv2.imencode(".jpg", image)
|
||
if not ok:
|
||
raise RuntimeError("Failed to encode image to JPEG")
|
||
return base64.b64encode(buf.tobytes()).decode("ascii")
|
||
|
||
def run(self):
|
||
print("[INFO] FrameProcessorWorker started")
|
||
target_interval = 1.0 / RTSP_TARGET_FPS
|
||
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
item = self.raw_frame_queue.get(timeout=0.5)
|
||
except queue.Empty:
|
||
continue
|
||
|
||
camera_id = item["camera_id"]
|
||
ts = item["timestamp"]
|
||
frame = item["frame"]
|
||
|
||
last_ts = self.last_process_ts.get(camera_id, 0.0)
|
||
if ts - last_ts < target_interval:
|
||
# 丢弃多余帧,保证约10fps
|
||
self.raw_frame_queue.task_done()
|
||
continue
|
||
self.last_process_ts[camera_id] = ts
|
||
|
||
# 1) 写入 mp4 (当前segment)
|
||
writer, video_filepath = self._get_video_writer(camera_id, frame)
|
||
writer.write(frame)
|
||
self.video_frame_counts[camera_id] = self.video_frame_counts.get(camera_id, 0) + 1
|
||
|
||
# 2) 调用用户自定义处理逻辑
|
||
result = user_process_frame(frame, camera_id, ts)
|
||
|
||
if result is not None and "image" in result and "type" in result:
|
||
result_img = result["image"]
|
||
result_type = int(result["type"])
|
||
|
||
# 3) 通过 WebSocket 发送帧结果
|
||
try:
|
||
img_b64 = self._encode_image_to_base64(result_img)
|
||
except Exception as e:
|
||
print(f"[ERROR] Encode image failed: {e}")
|
||
img_b64 = None
|
||
|
||
if img_b64 is not None:
|
||
msg = {
|
||
"msg_type": "frame",
|
||
"camera_id": camera_id,
|
||
"timestamp": ts,
|
||
"result_type": result_type,
|
||
"image_base64": img_b64,
|
||
}
|
||
try:
|
||
self.ws_send_queue.put(msg, timeout=1.0)
|
||
except queue.Full:
|
||
print("[WARN] ws_send_queue full, drop frame message")
|
||
|
||
# 4) 如果 result_type != 0,通过 WebSocket 发送告警
|
||
if result_type != 0:
|
||
alert_msg = {
|
||
"msg_type": "alert",
|
||
"camera_id": camera_id,
|
||
"event_type": result_type,
|
||
"video_file": video_filepath,
|
||
"timestamp": ts,
|
||
}
|
||
try:
|
||
self.ws_send_queue.put(alert_msg, timeout=1.0)
|
||
except queue.Full:
|
||
print("[WARN] ws_send_queue full, drop alert message")
|
||
|
||
# 5) 检查是否需要切换到下一个 mp4 segment
|
||
self._close_segment_if_needed(camera_id)
|
||
|
||
self.raw_frame_queue.task_done()
|
||
|
||
# 退出时,关闭所有 VideoWriter
|
||
for cam_id, writer in list(self.video_writers.items()):
|
||
writer.release()
|
||
print(f"[INFO] Release writer on exit: camera={cam_id}")
|
||
print("[INFO] FrameProcessorWorker stopped")
|
||
|
||
|
||
# =========================
|
||
# 用户自定义函数 (TBD)
|
||
# =========================
|
||
|
||
def user_process_frame(image, camera_id: int, timestamp: float) -> Dict[str, Any]:
|
||
"""
|
||
你在这里实现算法逻辑:
|
||
- image: numpy.ndarray, BGR
|
||
- camera_id: 摄像头 id
|
||
- timestamp: 捕获时间戳 (time.time())
|
||
返回:
|
||
- {"image": image, "type": int}
|
||
"""
|
||
# TODO: 替换为你的实际逻辑,例如模型推理
|
||
result_type = 0 # 示例:默认0
|
||
|
||
return {
|
||
"image": image,
|
||
"type": result_type,
|
||
}
|
||
|
||
|
||
# =========================
|
||
# 服务封装
|
||
# =========================
|
||
|
||
class RTSPService:
|
||
def __init__(self, config_path: str):
|
||
self.config_path = config_path
|
||
self.cameras = self._load_config()
|
||
|
||
self.stop_event = threading.Event()
|
||
|
||
# 队列
|
||
self.raw_frame_queue: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=500)
|
||
self.ws_send_queue: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=1000)
|
||
|
||
# 线程
|
||
self.capture_workers = []
|
||
self.frame_processor = FrameProcessorWorker(self.raw_frame_queue, self.ws_send_queue, self.stop_event)
|
||
self.ws_sender = WebSocketSender(self.ws_send_queue, self.stop_event)
|
||
|
||
def _load_config(self):
|
||
with open(self.config_path, "r", encoding="utf-8") as f:
|
||
cfg = yaml.safe_load(f)
|
||
cameras_cfg = cfg.get("cameras", [])
|
||
cameras = []
|
||
for c in cameras_cfg:
|
||
cameras.append(
|
||
CameraConfig(
|
||
id=int(c["id"]),
|
||
name=str(c.get("name", f"cam_{c['id']}")),
|
||
rtsp_url=str(c["rtsp_url"]),
|
||
)
|
||
)
|
||
return cameras
|
||
|
||
def start(self):
|
||
print("[INFO] RTSPService starting...")
|
||
|
||
# 启动 WebSocket 发送线程
|
||
self.ws_sender.start()
|
||
|
||
# 启动帧处理线程
|
||
self.frame_processor.start()
|
||
|
||
# 启动每个摄像头的抓流线程
|
||
for cam in self.cameras:
|
||
w = RTSPCaptureWorker(cam, self.raw_frame_queue, self.stop_event)
|
||
w.start()
|
||
self.capture_workers.append(w)
|
||
|
||
print("[INFO] RTSPService started")
|
||
|
||
def stop(self):
|
||
print("[INFO] RTSPService stopping...")
|
||
self.stop_event.set()
|
||
|
||
# 等待队列处理完(可选)
|
||
try:
|
||
self.raw_frame_queue.join()
|
||
self.ws_send_queue.join()
|
||
except Exception:
|
||
pass
|
||
|
||
for w in self.capture_workers:
|
||
w.join(timeout=1.0)
|
||
self.frame_processor.join(timeout=1.0)
|
||
self.ws_sender.join(timeout=1.0)
|
||
|
||
print("[INFO] RTSPService stopped")
|
||
|
||
|
||
def main():
|
||
service = RTSPService(config_path="config.yaml")
|
||
service.start()
|
||
try:
|
||
while True:
|
||
time.sleep(1.0)
|
||
except KeyboardInterrupt:
|
||
print("[INFO] KeyboardInterrupt, shutting down...")
|
||
finally:
|
||
service.stop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |