上传警告信息改为传视频,已测试保存视频,未与接口联调

This commit is contained in:
zqc
2026-03-05 14:41:49 +08:00
parent 6a1013d138
commit 80bc288a88
4 changed files with 386 additions and 7 deletions

View File

@@ -4,6 +4,9 @@
import cv2
import base64
import json
import os
import subprocess
import time
import threading
import queue
@@ -13,6 +16,7 @@ from concurrent.futures import ThreadPoolExecutor
from common import constants
from utils.logger import get_logger
from utils.hls_utils import get_segments_before_current, parse_segment_info
logger = get_logger(__name__)
@@ -67,6 +71,12 @@ class BaseFrameProcessorWorker(threading.Thread):
max_workers=post_workers,
thread_name_prefix="alert_post"
)
# MP4缓存 {segment_path: mp4_path}
self._mp4_cache: Dict[str, str] = {}
# 启动视频文件清理线程
self._start_cleanup_thread()
def _encode_image_to_base64(self, img) -> str:
"""图像编码为 Base64"""
@@ -102,7 +112,7 @@ class BaseFrameProcessorWorker(threading.Thread):
return result
def _post_alert(self, msg: dict):
"""异步发送告警 POST 请求(在线程池中执行)"""
"""异步发送告警 POST 请求(在线程池中执行)- 旧接口,保留备用"""
try:
response = requests.post(constants.ALERT_PUSH_URL, json=msg, timeout=5.0)
if response.status_code == 200:
@@ -112,6 +122,218 @@ class BaseFrameProcessorWorker(threading.Thread):
except Exception as e:
print(f"[ERROR] POST alert request failed: {e}")
def _post_alert_with_video(self, msg: dict, video_path: str = None):
"""
异步发送告警 POST 请求带视频multipart/form-data
Args:
msg: 消息内容
video_path: 视频文件路径(可选)
"""
try:
if video_path and os.path.exists(video_path):
# 有视频,使用 multipart/form-data 上传
with open(video_path, 'rb') as f:
files = {
'video': f,
'metadata': (None, json.dumps(msg))
}
response = requests.post(constants.ALERT_PUSH_URL, files=files, timeout=10.0)
else:
# 无视频,也使用 multipart/form-data
files = {
'metadata': (None, json.dumps(msg))
}
response = requests.post(constants.ALERT_PUSH_URL, files=files, timeout=5.0)
if response.status_code == 200:
logger.info(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}")
else:
logger.warning(f"[WARN] POST alert failed with status: {response.status_code}")
except Exception as e:
logger.error(f"[ERROR] POST alert request failed: {e}")
def _start_cleanup_thread(self):
"""启动视频文件清理线程"""
def cleanup_loop():
while not self.stop_event.is_set():
try:
self._cleanup_expired_files()
except Exception as e:
logger.error(f"[ERROR] Cleanup thread error: {e}")
# 每10分钟检查一次
self.stop_event.wait(600)
thread = threading.Thread(target=cleanup_loop, daemon=True, name="video_cleanup")
thread.start()
logger.info("[INFO] Video cleanup thread started")
def _cleanup_expired_files(self):
"""清理过期的视频文件"""
output_dir = constants.VIDEO_CLIP_OUTPUT_DIR
if not output_dir or not os.path.exists(output_dir):
return
retention_seconds = constants.VIDEO_CLIP_RETENTION_SECONDS
current_time = time.time()
try:
for filename in os.listdir(output_dir):
if not filename.endswith('.mp4'):
continue
filepath = os.path.join(output_dir, filename)
if os.path.isfile(filepath):
file_mtime = os.path.getmtime(filepath)
if current_time - file_mtime > retention_seconds:
try:
os.remove(filepath)
logger.info(f"[INFO] Cleaned up expired video: {filename}")
except Exception as e:
logger.error(f"[ERROR] Failed to delete {filename}: {e}")
except Exception as e:
logger.error(f"[ERROR] Cleanup expired files error: {e}")
def _create_or_get_video_clip(self, segment_path: str, segment_duration: float = None) -> str | None:
"""
创建或获取视频剪辑
Args:
segment_path: 当前TS分片路径
segment_duration: 当前分片时长(秒)
Returns:
MP4文件路径失败返回 None
"""
if not segment_path:
return None
# 检查缓存
if segment_path in self._mp4_cache:
cached_path = self._mp4_cache[segment_path]
if os.path.exists(cached_path):
return cached_path
else:
# 缓存失效,移除
del self._mp4_cache[segment_path]
# 解析分片信息构建MP4路径
camera_id, timestamp, seq = parse_segment_info(segment_path)
if not camera_id:
logger.warning(f"[WARN] Failed to parse segment info: {segment_path}")
return None
output_dir = constants.VIDEO_CLIP_OUTPUT_DIR
if not output_dir:
logger.warning("[WARN] VIDEO_CLIP_OUTPUT_DIR not configured")
return None
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# MP4文件名
mp4_filename = f"{camera_id}_{timestamp}_{seq}.mp4"
mp4_path = os.path.join(output_dir, mp4_filename)
# 检查是否已存在
if os.path.exists(mp4_path):
self._mp4_cache[segment_path] = mp4_path
return mp4_path
# 计算需要回溯的分片数量
clip_duration = constants.VIDEO_CLIP_DURATION_SECONDS
default_segment_duration = constants.VIDEO_CLIP_DEFAULT_SEGMENT_DURATION
effective_duration = segment_duration if segment_duration else default_segment_duration
if effective_duration <= 0:
effective_duration = default_segment_duration
n_segments = int(clip_duration / effective_duration) + 1
# 获取需要合并的分片
ts_files = get_segments_before_current(segment_path, n_segments)
if not ts_files:
logger.warning(f"[WARN] No segments found for clip: {segment_path}")
return None
# 合并TS为MP4
if self._merge_ts_to_mp4(ts_files, mp4_path):
self._mp4_cache[segment_path] = mp4_path
return mp4_path
return None
def _merge_ts_to_mp4(self, ts_files: list, output_path: str) -> bool:
"""
使用 ffmpeg 合并 TS 分片为 MP4
Args:
ts_files: TS文件路径列表按时间顺序
output_path: 输出MP4路径
Returns:
是否成功
"""
if not ts_files:
return False
try:
# 构建 concat 字符串
concat_str = "|".join(ts_files)
# ffmpeg 命令
cmd = [
'ffmpeg',
'-i', f'concat:{concat_str}',
'-c', 'copy',
'-y', # 覆盖输出文件
output_path
]
# 执行命令
result = subprocess.run(
cmd,
capture_output=True,
timeout=60 # 60秒超时
)
if result.returncode == 0:
logger.info(f"[INFO] Created video clip: {output_path}")
return True
else:
logger.error(f"[ERROR] ffmpeg failed: {result.stderr.decode()}")
return False
except subprocess.TimeoutExpired:
logger.error(f"[ERROR] ffmpeg timeout for {output_path}")
return False
except FileNotFoundError:
logger.error("[ERROR] ffmpeg not found, please install ffmpeg")
return False
except Exception as e:
logger.error(f"[ERROR] Failed to merge TS files: {e}")
return False
def _process_alert_with_video(self, msg: dict, segment_path: str, segment_duration: float):
"""
处理告警(含视频剪辑)- 在线程池中执行
Args:
msg: 基础消息
segment_path: 当前TS分片路径
segment_duration: 当前分片时长
"""
# 尝试创建/获取视频剪辑
mp4_path = None
if segment_path:
mp4_path = self._create_or_get_video_clip(segment_path, segment_duration)
# 展开 result_type
expanded_msgs = self._expand_msg_by_result_type(msg)
# 发送每个展开后的消息
for expanded_msg in expanded_msgs:
self._post_alert_with_video(expanded_msg, mp4_path)
def _create_detector(self, params):
"""创建检测器实例"""
# 使用 type(self) 访问类属性,避免 lambda 被绑定 self 参数
@@ -203,13 +425,24 @@ class BaseFrameProcessorWorker(threading.Thread):
try:
self.ws_queue.put(msg, timeout=1.0)
if push_actions and len(push_actions) > 0:
# 异步发送 POST 请求(提交到线程池)
# 构建消息
post_msg = msg.copy()
post_msg['type'] = self.POST_TYPE
# 展开 result_type 为多个独立的 msg
expanded_msgs = self._expand_msg_by_result_type(post_msg)
for expanded_msg in expanded_msgs:
self.post_executor.submit(self._post_alert, expanded_msg)
#备用backup
#self.post_executor.submit(self._post_alert, post_msg)
# 获取视频相关信息仅HLS模式有
segment_path = item.get("segment_path")
segment_duration = item.get("segment_duration")
# 提交到线程池执行包含视频剪辑和POST
self.post_executor.submit(
self._process_alert_with_video,
post_msg,
segment_path,
segment_duration
)
except queue.Full:
logger.warning("[WARN] ws_send_queue full, drop frame message")

View File

@@ -9,6 +9,12 @@ HLS_ROOT_PATH = ""
HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式
# 视频剪辑配置
VIDEO_CLIP_OUTPUT_DIR = ""
VIDEO_CLIP_DURATION_SECONDS = 30
VIDEO_CLIP_RETENTION_SECONDS = 3600
VIDEO_CLIP_DEFAULT_SEGMENT_DURATION = 2
def init_config(config_path: str = "config.yaml"):
"""
@@ -18,6 +24,7 @@ def init_config(config_path: str = "config.yaml"):
config_path: 配置文件路径,默认为 config.yaml
"""
global ALERT_PUSH_URL, HLS_ROOT_PATH
global VIDEO_CLIP_OUTPUT_DIR, VIDEO_CLIP_DURATION_SECONDS, VIDEO_CLIP_RETENTION_SECONDS, VIDEO_CLIP_DEFAULT_SEGMENT_DURATION
try:
with open(config_path, "r", encoding="utf-8") as f:
@@ -25,6 +32,13 @@ def init_config(config_path: str = "config.yaml"):
ALERT_PUSH_URL = cfg.get("alert_push_url", "")
# HLS_ROOT_PATH = cfg.get("hls_root_path", "")
# 视频剪辑配置
VIDEO_CLIP_OUTPUT_DIR = cfg.get("video_clip_output_dir", "")
VIDEO_CLIP_DURATION_SECONDS = cfg.get("video_clip_duration_seconds", 30)
VIDEO_CLIP_RETENTION_SECONDS = cfg.get("video_clip_retention_seconds", 3600)
VIDEO_CLIP_DEFAULT_SEGMENT_DURATION = cfg.get("video_clip_default_segment_duration", 2)
logger.info(f"[INFO] Config initialized from {config_path}, alert_push_url={ALERT_PUSH_URL}")
except Exception as e:

View File

@@ -41,6 +41,12 @@ hls_downloader_daily_rotate_hour: 3 # 凌晨轮换时间
hls_downloader_retention_days: 3 # 文件保留天数
hls_downloader_retry_interval_seconds: 10 # 重试等待秒数
# 视频剪辑配置
video_clip_output_dir: "D:/ProjectDoc/Police/data/video_clips" # 视频剪辑输出目录
video_clip_duration_seconds: 30 # 回溯时长(秒)
video_clip_retention_seconds: 3600 # 视频文件保留时长(秒)
video_clip_default_segment_duration: 2 # 默认分片时长fallback
service_groups:
- name: "kadian_group" # 服务组名称
video_source_type: "hls"
@@ -49,7 +55,7 @@ service_groups:
algorithm: "checkpoint" # 算法类型
cameras: # 该组下的摄像头列表
- id: 8
index: 12345
index: "12345"
name: Entrance
params:
roi_points:

View File

@@ -1,5 +1,6 @@
import os
import glob
import re
from common import constants
@@ -67,3 +68,128 @@ def get_latest_n_segments_by_camera_id(camera_id: str, n: int) -> list:
"""
camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id)
return get_latest_n_segments(camera_root_dir, n)
def parse_segment_info(segment_path: str) -> tuple:
"""
从TS分片路径解析出 camera_id, timestamp, sequence
Args:
segment_path: TS分片路径格式如: hls_root_path/camera_id/timestamp/segment_xxxxx.ts
Returns:
(camera_id, timestamp, sequence) 或 (None, None, None) 解析失败时
"""
try:
# 获取文件名和目录结构
# 路径格式: .../camera_id/timestamp/segment_00001.ts
abs_path = os.path.abspath(segment_path)
parts = abs_path.split(os.sep)
# 从后往前找
# parts[-1] = segment_00001.ts
# parts[-2] = timestamp
# parts[-3] = camera_id
if len(parts) < 3:
return None, None, None
# 解析序号
filename = parts[-1]
match = re.search(r'segment_(\d+)\.ts$', filename)
if not match:
return None, None, None
sequence = match.group(1)
# 时间戳
timestamp = parts[-2]
# camera_id
camera_id = parts[-3]
return camera_id, timestamp, sequence
except Exception:
return None, None, None
def get_segments_before_current(current_segment_path: str, n: int) -> list:
"""
获取当前分片之前的n个分片包括当前分片
Args:
current_segment_path: 当前TS分片路径
n: 需要获取的分片数量
Returns:
分片路径列表按时间顺序旧的在前如果不够n个则返回现有的
"""
if not current_segment_path or not os.path.exists(current_segment_path):
return []
# 解析当前分片信息
camera_id, timestamp, current_seq = parse_segment_info(current_segment_path)
if not camera_id:
return []
# 构建摄像头根目录和时间戳目录
camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id)
timestamp_dir = os.path.join(camera_root_dir, timestamp)
if not os.path.exists(timestamp_dir):
return []
# 获取当前时间戳文件夹下的所有分片
pattern = os.path.join(timestamp_dir, "segment_*.ts")
segment_files = glob.glob(pattern)
# 按分片序号排序
segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
# 找到当前分片的位置
try:
current_index = segment_files.index(current_segment_path)
except ValueError:
# 当前分片不在列表中
return []
# 计算起始位置
start_index = max(0, current_index - n + 1)
# 返回从起始位置到当前位置的所有分片
result = segment_files[start_index:current_index + 1]
# 如果不够n个需要从之前的时间戳文件夹获取
if len(result) < n:
# 获取所有时间戳文件夹
timestamp_folders = []
for folder_name in os.listdir(camera_root_dir):
folder_path = os.path.join(camera_root_dir, folder_name)
if os.path.isdir(folder_path):
timestamp_folders.append(folder_name)
# 排序,找到当前时间戳之前的时间戳
timestamp_folders.sort()
try:
current_ts_index = timestamp_folders.index(timestamp)
except ValueError:
current_ts_index = len(timestamp_folders)
# 从之前的时间戳文件夹获取分片
needed_count = n - len(result)
for i in range(current_ts_index - 1, -1, -1):
prev_ts_dir = os.path.join(camera_root_dir, timestamp_folders[i])
prev_pattern = os.path.join(prev_ts_dir, "segment_*.ts")
prev_segments = glob.glob(prev_pattern)
prev_segments.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0]))
# 取最后 needed_count 个
take_count = min(needed_count, len(prev_segments))
if take_count > 0:
result = prev_segments[-take_count:] + result
needed_count -= take_count
if needed_count <= 0:
break
return result